functional.py 40.3 KB
Newer Older
1
from __future__ import absolute_import, division, print_function, unicode_literals
Vincent QB's avatar
Vincent QB committed
2

3
import math
Vincent QB's avatar
Vincent QB committed
4

Jason Lian's avatar
Jason Lian committed
5
6
import torch

Jason Lian's avatar
pre  
Jason Lian committed
7
__all__ = [
8
9
10
11
12
13
14
15
16
17
18
19
20
21
    "istft",
    "spectrogram",
    "amplitude_to_DB",
    "create_fb_matrix",
    "create_dct",
    "mu_law_encoding",
    "mu_law_decoding",
    "complex_norm",
    "angle",
    "magphase",
    "phase_vocoder",
    "lfilter",
    "lowpass_biquad",
    "highpass_biquad",
xinyang0's avatar
xinyang0 committed
22
    "equalizer_biquad",
23
    "biquad",
24
25
    'mask_along_axis',
    'mask_along_axis_iid'
Jason Lian's avatar
pre  
Jason Lian committed
26
27
]

Vincent QB's avatar
Vincent QB committed
28

Jason Lian's avatar
Jason Lian committed
29
# TODO: remove this once https://github.com/pytorch/pytorch/issues/21478 gets solved
Jason Lian's avatar
more  
Jason Lian committed
30
@torch.jit.ignore
31
32
33
34
35
36
37
38
39
40
41
def _stft(
    waveform,
    n_fft,
    hop_length,
    win_length,
    window,
    center,
    pad_mode,
    normalized,
    onesided,
):
42
    # type: (Tensor, int, Optional[int], Optional[int], Optional[Tensor], bool, str, bool, bool) -> Tensor
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
    return torch.stft(
        waveform,
        n_fft,
        hop_length,
        win_length,
        window,
        center,
        pad_mode,
        normalized,
        onesided,
    )


def istft(
    stft_matrix,  # type: Tensor
    n_fft,  # type: int
    hop_length=None,  # type: Optional[int]
    win_length=None,  # type: Optional[int]
    window=None,  # type: Optional[Tensor]
    center=True,  # type: bool
    pad_mode="reflect",  # type: str
    normalized=False,  # type: bool
    onesided=True,  # type: bool
    length=None,  # type: Optional[int]
):
jamarshon's avatar
jamarshon committed
68
    # type: (...) -> Tensor
69
    r"""Inverse short time Fourier Transform. This is expected to be the inverse of torch.stft.
jamarshon's avatar
jamarshon committed
70
    It has the same parameters (+ additional optional parameter of ``length``) and it should return the
jamarshon's avatar
jamarshon committed
71
72
    least squares estimation of the original signal. The algorithm will check using the NOLA condition (
    nonzero overlap).
jamarshon's avatar
jamarshon committed
73
74

    Important consideration in the parameters ``window`` and ``center`` so that the envelop
jamarshon's avatar
jamarshon committed
75
    created by the summation of all the windows is never zero at certain point in time. Specifically,
jamarshon's avatar
jamarshon committed
76
77
    :math:`\sum_{t=-\infty}^{\infty} w^2[n-t\times hop\_length] \cancel{=} 0`.

jamarshon's avatar
jamarshon committed
78
    Since stft discards elements at the end of the signal if they do not fit in a frame, the
79
    istft may return a shorter signal than the original signal (can occur if ``center`` is False
jamarshon's avatar
jamarshon committed
80
    since the signal isn't padded).
jamarshon's avatar
jamarshon committed
81
82

    If ``center`` is True, then there will be padding e.g. 'constant', 'reflect', etc. Left padding
jamarshon's avatar
jamarshon committed
83
84
    can be trimmed off exactly because they can be calculated but right padding cannot be calculated
    without additional information.
jamarshon's avatar
jamarshon committed
85

jamarshon's avatar
jamarshon committed
86
87
    Example: Suppose the last window is:
    [17, 18, 0, 0, 0] vs [18, 0, 0, 0, 0]
jamarshon's avatar
jamarshon committed
88

Vincent QB's avatar
Vincent QB committed
89
    The n_frame, hop_length, win_length are all the same which prevents the calculation of right padding.
jamarshon's avatar
jamarshon committed
90
91
92
93
    These additional values could be zeros or a reflection of the signal so providing ``length``
    could be useful. If ``length`` is ``None`` then padding will be aggressively removed
    (some loss of signal).

94
95
    [1] D. W. Griffin and J. S. Lim, "Signal estimation from modified short-time Fourier transform,"
    IEEE Trans. ASSP, vol.32, no.2, pp.236-243, Apr. 1984.
jamarshon's avatar
jamarshon committed
96
97

    Args:
98
        stft_matrix (torch.Tensor): Output of stft where each row of a channel is a frequency and each
99
            column is a window. it has a size of either (..., fft_size, n_frame, 2)
jamarshon's avatar
jamarshon committed
100
101
102
103
104
105
106
        n_fft (int): Size of Fourier transform
        hop_length (Optional[int]): The distance between neighboring sliding window frames.
            (Default: ``win_length // 4``)
        win_length (Optional[int]): The size of window frame and STFT filter. (Default: ``n_fft``)
        window (Optional[torch.Tensor]): The optional window function.
            (Default: ``torch.ones(win_length)``)
        center (bool): Whether ``input`` was padded on both sides so
107
108
109
110
111
112
            that the :math:`t`-th frame is centered at time :math:`t \times \text{hop\_length}`.
            (Default: ``True``)
        pad_mode (str): Controls the padding method used when ``center`` is True. (Default:
            ``'reflect'``)
        normalized (bool): Whether the STFT was normalized. (Default: ``False``)
        onesided (bool): Whether the STFT is onesided. (Default: ``True``)
jamarshon's avatar
jamarshon committed
113
        length (Optional[int]): The amount to trim the signal by (i.e. the
jamarshon's avatar
jamarshon committed
114
            original signal length). (Default: whole signal)
jamarshon's avatar
jamarshon committed
115
116

    Returns:
Vincent QB's avatar
Vincent QB committed
117
        torch.Tensor: Least squares estimation of the original signal of size (..., signal_length)
jamarshon's avatar
jamarshon committed
118
119
    """
    stft_matrix_dim = stft_matrix.dim()
Vincent QB's avatar
Vincent QB committed
120
121
    assert 3 <= stft_matrix_dim, "Incorrect stft dimension: %d" % (stft_matrix_dim)
    assert stft_matrix.nelement() > 0
jamarshon's avatar
jamarshon committed
122
123

    if stft_matrix_dim == 3:
124
        # add a channel dimension
jamarshon's avatar
jamarshon committed
125
126
        stft_matrix = stft_matrix.unsqueeze(0)

Vincent QB's avatar
Vincent QB committed
127
128
129
130
    # pack batch
    shape = stft_matrix.size()
    stft_matrix = stft_matrix.reshape(-1, *shape[-3:])

131
    dtype = stft_matrix.dtype
jamarshon's avatar
jamarshon committed
132
133
    device = stft_matrix.device
    fft_size = stft_matrix.size(1)
134
135
136
137
138
139
140
    assert (onesided and n_fft // 2 + 1 == fft_size) or (
        not onesided and n_fft == fft_size
    ), (
        "one_sided implies that n_fft // 2 + 1 == fft_size and not one_sided implies n_fft == fft_size. "
        + "Given values were onesided: %s, n_fft: %d, fft_size: %d"
        % ("True" if onesided else False, n_fft, fft_size)
    )
jamarshon's avatar
jamarshon committed
141
142
143
144
145
146
147
148
149
150
151
152
153

    # use stft defaults for Optionals
    if win_length is None:
        win_length = n_fft

    if hop_length is None:
        hop_length = int(win_length // 4)

    # There must be overlap
    assert 0 < hop_length <= win_length
    assert 0 < win_length <= n_fft

    if window is None:
154
        window = torch.ones(win_length, requires_grad=False, device=device, dtype=dtype)
jamarshon's avatar
jamarshon committed
155
156
157
158
159
160
161
162
163
164

    assert window.dim() == 1 and window.size(0) == win_length

    if win_length != n_fft:
        # center window with pad left and right zeros
        left = (n_fft - win_length) // 2
        window = torch.nn.functional.pad(window, (left, n_fft - win_length - left))
        assert window.size(0) == n_fft
    # win_length and n_fft are synonymous from here on

Vincent QB's avatar
Vincent QB committed
165
    stft_matrix = stft_matrix.transpose(1, 2)  # size (channel, n_frame, fft_size, 2)
166
167
    stft_matrix = torch.irfft(
        stft_matrix, 1, normalized, onesided, signal_sizes=(n_fft,)
Vincent QB's avatar
Vincent QB committed
168
    )  # size (channel, n_frame, n_fft)
jamarshon's avatar
jamarshon committed
169
170

    assert stft_matrix.size(2) == n_fft
Vincent QB's avatar
Vincent QB committed
171
    n_frame = stft_matrix.size(1)
jamarshon's avatar
jamarshon committed
172

Vincent QB's avatar
Vincent QB committed
173
    ytmp = stft_matrix * window.view(1, 1, n_fft)  # size (channel, n_frame, n_fft)
174
    # each column of a channel is a frame which needs to be overlap added at the right place
Vincent QB's avatar
Vincent QB committed
175
    ytmp = ytmp.transpose(1, 2)  # size (channel, n_fft, n_frame)
jamarshon's avatar
jamarshon committed
176

177
178
179
    eye = torch.eye(n_fft, requires_grad=False, device=device, dtype=dtype).unsqueeze(
        1
    )  # size (n_fft, 1, n_fft)
jamarshon's avatar
jamarshon committed
180
181
182
183

    # this does overlap add where the frames of ytmp are added such that the i'th frame of
    # ytmp is added starting at i*hop_length in the output
    y = torch.nn.functional.conv_transpose1d(
184
185
        ytmp, eye, stride=hop_length, padding=0
    )  # size (channel, 1, expected_signal_len)
jamarshon's avatar
jamarshon committed
186
187

    # do the same for the window function
188
    window_sq = (
Vincent QB's avatar
Vincent QB committed
189
190
        window.pow(2).view(n_fft, 1).repeat((1, n_frame)).unsqueeze(0)
    )  # size (1, n_fft, n_frame)
jamarshon's avatar
jamarshon committed
191
    window_envelop = torch.nn.functional.conv_transpose1d(
192
193
        window_sq, eye, stride=hop_length, padding=0
    )  # size (1, 1, expected_signal_len)
jamarshon's avatar
jamarshon committed
194

Vincent QB's avatar
Vincent QB committed
195
    expected_signal_len = n_fft + hop_length * (n_frame - 1)
jamarshon's avatar
jamarshon committed
196
197
198
199
200
201
202
203
204
205
206
207
208
    assert y.size(2) == expected_signal_len
    assert window_envelop.size(2) == expected_signal_len

    half_n_fft = n_fft // 2
    # we need to trim the front padding away if center
    start = half_n_fft if center else 0
    end = -half_n_fft if length is None else start + length

    y = y[:, :, start:end]
    window_envelop = window_envelop[:, :, start:end]

    # check NOLA non-zero overlap condition
    window_envelop_lowest = window_envelop.abs().min()
209
210
211
    assert window_envelop_lowest > 1e-11, "window overlap add min: %f" % (
        window_envelop_lowest
    )
jamarshon's avatar
jamarshon committed
212

213
    y = (y / window_envelop).squeeze(1)  # size (channel, expected_signal_len)
jamarshon's avatar
jamarshon committed
214

Vincent QB's avatar
Vincent QB committed
215
216
217
    # unpack batch
    y = y.reshape(shape[:-3] + y.shape[-1:])

218
    if stft_matrix_dim == 3:  # remove the channel dimension
jamarshon's avatar
jamarshon committed
219
        y = y.squeeze(0)
Vincent QB's avatar
Vincent QB committed
220

jamarshon's avatar
jamarshon committed
221
222
223
    return y


224
225
226
def spectrogram(
    waveform, pad, window, n_fft, hop_length, win_length, power, normalized
):
227
    # type: (Tensor, int, Tensor, int, int, int, Optional[int], bool) -> Tensor
Vincent QB's avatar
Vincent QB committed
228
    r"""Create a spectrogram or a batch of spectrograms from a raw audio signal.
229
    The spectrogram can be either magnitude-only or complex.
jamarshon's avatar
jamarshon committed
230
231

    Args:
232
        waveform (torch.Tensor): Tensor of audio of dimension (..., channel, time)
jamarshon's avatar
jamarshon committed
233
        pad (int): Two sided padding of signal
234
        window (torch.Tensor): Window tensor that is applied/multiplied to each frame/window
235
        n_fft (int): Size of FFT
236
237
238
        hop_length (int): Length of hop between STFT windows
        win_length (int): Window size
        power (int): Exponent for the magnitude spectrogram,
jamarshon's avatar
jamarshon committed
239
            (must be > 0) e.g., 1 for energy, 2 for power, etc.
240
            If None, then the complex spectrum is returned instead.
241
        normalized (bool): Whether to normalize by magnitude after stft
jamarshon's avatar
jamarshon committed
242
243

    Returns:
244
        torch.Tensor: Dimension (..., channel, freq, time), where channel
Vincent QB's avatar
Vincent QB committed
245
246
        is unchanged, freq is ``n_fft // 2 + 1`` and ``n_fft`` is the number of
        Fourier bins, and time is the number of window hops (n_frame).
Jason Lian's avatar
Jason Lian committed
247
    """
Jason Lian's avatar
Jason Lian committed
248
249

    if pad > 0:
250
        # TODO add "with torch.no_grad():" back when JIT supports it
251
        waveform = torch.nn.functional.pad(waveform, (pad, pad), "constant")
Jason Lian's avatar
Jason Lian committed
252

253
254
255
256
    # pack batch
    shape = waveform.size()
    waveform = waveform.reshape(-1, shape[-1])

Jason Lian's avatar
Jason Lian committed
257
    # default values are consistent with librosa.core.spectrum._spectrogram
258
259
260
    spec_f = _stft(
        waveform, n_fft, hop_length, win_length, window, True, "reflect", False, True
    )
261

262
263
264
    # unpack batch
    spec_f = spec_f.reshape(shape[:-1] + spec_f.shape[-3:])

265
    if normalized:
Jason Lian's avatar
Jason Lian committed
266
        spec_f /= window.pow(2).sum().sqrt()
267
268
269
    if power is not None:
        spec_f = spec_f.pow(power).sum(-1)  # get power of "complex" tensor

Jason Lian's avatar
Jason Lian committed
270
    return spec_f
Jason Lian's avatar
more  
Jason Lian committed
271
272


273
def amplitude_to_DB(x, multiplier, amin, db_multiplier, top_db=None):
274
    # type: (Tensor, float, float, float, Optional[float]) -> Tensor
Vincent QB's avatar
Vincent QB committed
275
    r"""Turn a tensor from the power/amplitude scale to the decibel scale.
276

277
    This output depends on the maximum value in the input tensor, and so
278
279
280
281
    may return different values for an audio clip split into snippets vs. a
    a full clip.

    Args:
282
        x (torch.Tensor): Input tensor before being converted to decibel scale
283
        multiplier (float): Use 10. for power and 20. for amplitude
284
        amin (float): Number to clamp ``x``
285
286
        db_multiplier (float): Log10(max(reference value and amin))
        top_db (Optional[float]): Minimum negative cut-off in decibels. A reasonable number
287
            is 80. (Default: ``None``)
288
289

    Returns:
290
        torch.Tensor: Output tensor in decibel scale
291
    """
292
293
    x_db = multiplier * torch.log10(torch.clamp(x, min=amin))
    x_db -= multiplier * db_multiplier
294
295

    if top_db is not None:
296
        x_db = x_db.clamp(min=x_db.max().item() - top_db)
297

298
    return x_db
299
300


engineerchuan's avatar
engineerchuan committed
301
302
def create_fb_matrix(n_freqs, f_min, f_max, n_mels, sample_rate):
    # type: (int, float, float, int, int) -> Tensor
Vincent QB's avatar
Vincent QB committed
303
    r"""Create a frequency bin conversion matrix.
Jason Lian's avatar
more  
Jason Lian committed
304

jamarshon's avatar
jamarshon committed
305
    Args:
306
        n_freqs (int): Number of frequencies to highlight/apply
engineerchuan's avatar
engineerchuan committed
307
308
        f_min (float): Minimum frequency (Hz)
        f_max (float): Maximum frequency (Hz)
309
        n_mels (int): Number of mel filterbanks
engineerchuan's avatar
engineerchuan committed
310
        sample_rate (int): Sample rate of the audio waveform
Jason Lian's avatar
more  
Jason Lian committed
311

jamarshon's avatar
jamarshon committed
312
    Returns:
313
        torch.Tensor: Triangular filter banks (fb matrix) of size (``n_freqs``, ``n_mels``)
314
315
        meaning number of frequencies to highlight/apply to x the number of filterbanks.
        Each column is a filterbank so that assuming there is a matrix A of
316
317
        size (..., ``n_freqs``), the applied result would be
        ``A * create_fb_matrix(A.size(-1), ...)``.
318
    """
319
    # freq bins
engineerchuan's avatar
engineerchuan committed
320
321
322
323
324
    # Equivalent filterbank construction by Librosa
    all_freqs = torch.linspace(0, sample_rate // 2, n_freqs)
    i_freqs = all_freqs.ge(f_min) & all_freqs.le(f_max)
    freqs = all_freqs[i_freqs]

Jason Lian's avatar
more  
Jason Lian committed
325
    # calculate mel freq bins
326
    # hertz to mel(f) is 2595. * math.log10(1. + (f / 700.))
engineerchuan's avatar
engineerchuan committed
327
    m_min = 2595.0 * math.log10(1.0 + (f_min / 700.0))
328
    m_max = 2595.0 * math.log10(1.0 + (f_max / 700.0))
Jason Lian's avatar
more  
Jason Lian committed
329
    m_pts = torch.linspace(m_min, m_max, n_mels + 2)
330
    # mel to hertz(mel) is 700. * (10**(mel / 2595.) - 1.)
331
    f_pts = 700.0 * (10 ** (m_pts / 2595.0) - 1.0)
Jason Lian's avatar
more  
Jason Lian committed
332
333
    # calculate the difference between each mel point and each stft freq point in hertz
    f_diff = f_pts[1:] - f_pts[:-1]  # (n_mels + 1)
engineerchuan's avatar
engineerchuan committed
334
    slopes = f_pts.unsqueeze(0) - all_freqs.unsqueeze(1)  # (n_freqs, n_mels + 2)
Jason Lian's avatar
more  
Jason Lian committed
335
    # create overlapping triangles
336
    zero = torch.zeros(1)
337
    down_slopes = (-1.0 * slopes[:, :-2]) / f_diff[:-1]  # (n_freqs, n_mels)
338
339
    up_slopes = slopes[:, 2:] / f_diff[1:]  # (n_freqs, n_mels)
    fb = torch.max(zero, torch.min(down_slopes, up_slopes))
Jason Lian's avatar
more  
Jason Lian committed
340
341
342
    return fb


Jason Lian's avatar
more  
Jason Lian committed
343
def create_dct(n_mfcc, n_mels, norm):
344
    # type: (int, int, Optional[str]) -> Tensor
Vincent QB's avatar
Vincent QB committed
345
    r"""Create a DCT transformation matrix with shape (``n_mels``, ``n_mfcc``),
jamarshon's avatar
jamarshon committed
346
    normalized depending on norm.
Jason Lian's avatar
Jason Lian committed
347

jamarshon's avatar
jamarshon committed
348
    Args:
349
350
351
        n_mfcc (int): Number of mfc coefficients to retain
        n_mels (int): Number of mel filterbanks
        norm (Optional[str]): Norm to use (either 'ortho' or None)
Jason Lian's avatar
Jason Lian committed
352

jamarshon's avatar
jamarshon committed
353
    Returns:
354
        torch.Tensor: The transformation matrix, to be right-multiplied to
355
        row-wise data of size (``n_mels``, ``n_mfcc``).
Jason Lian's avatar
more  
Jason Lian committed
356
357
    """
    # http://en.wikipedia.org/wiki/Discrete_cosine_transform#DCT-II
358
359
360
    n = torch.arange(float(n_mels))
    k = torch.arange(float(n_mfcc)).unsqueeze(1)
    dct = torch.cos(math.pi / float(n_mels) * (n + 0.5) * k)  # size (n_mfcc, n_mels)
361
362
    if norm is None:
        dct *= 2.0
Jason Lian's avatar
more  
Jason Lian committed
363
    else:
364
        assert norm == "ortho"
365
        dct[0] *= 1.0 / math.sqrt(2.0)
366
        dct *= math.sqrt(2.0 / float(n_mels))
367
    return dct.t()
Jason Lian's avatar
more  
Jason Lian committed
368
369


370
def mu_law_encoding(x, quantization_channels):
371
    # type: (Tensor, int) -> Tensor
Vincent QB's avatar
Vincent QB committed
372
    r"""Encode signal based on mu-law companding.  For more info see the
Jason Lian's avatar
Jason Lian committed
373
374
375
    `Wikipedia Entry <https://en.wikipedia.org/wiki/%CE%9C-law_algorithm>`_

    This algorithm assumes the signal has been scaled to between -1 and 1 and
jamarshon's avatar
jamarshon committed
376
    returns a signal encoded with values from 0 to quantization_channels - 1.
Jason Lian's avatar
Jason Lian committed
377

jamarshon's avatar
jamarshon committed
378
379
    Args:
        x (torch.Tensor): Input tensor
380
        quantization_channels (int): Number of channels
Jason Lian's avatar
Jason Lian committed
381

jamarshon's avatar
jamarshon committed
382
    Returns:
383
        torch.Tensor: Input after mu-law encoding
Jason Lian's avatar
Jason Lian committed
384
    """
385
    mu = quantization_channels - 1.0
386
    if not x.is_floating_point():
387
388
        x = x.to(torch.float)
    mu = torch.tensor(mu, dtype=x.dtype)
389
    x_mu = torch.sign(x) * torch.log1p(mu * torch.abs(x)) / torch.log1p(mu)
Jason Lian's avatar
Jason Lian committed
390
    x_mu = ((x_mu + 1) / 2 * mu + 0.5).to(torch.int64)
Jason Lian's avatar
more  
Jason Lian committed
391
392
393
    return x_mu


394
def mu_law_decoding(x_mu, quantization_channels):
395
    # type: (Tensor, int) -> Tensor
Vincent QB's avatar
Vincent QB committed
396
    r"""Decode mu-law encoded signal.  For more info see the
Jason Lian's avatar
Jason Lian committed
397
398
399
400
401
    `Wikipedia Entry <https://en.wikipedia.org/wiki/%CE%9C-law_algorithm>`_

    This expects an input with values between 0 and quantization_channels - 1
    and returns a signal scaled between -1 and 1.

jamarshon's avatar
jamarshon committed
402
403
    Args:
        x_mu (torch.Tensor): Input tensor
404
        quantization_channels (int): Number of channels
Jason Lian's avatar
Jason Lian committed
405

jamarshon's avatar
jamarshon committed
406
    Returns:
407
        torch.Tensor: Input after mu-law decoding
Jason Lian's avatar
Jason Lian committed
408
    """
409
    mu = quantization_channels - 1.0
410
    if not x_mu.is_floating_point():
411
412
        x_mu = x_mu.to(torch.float)
    mu = torch.tensor(mu, dtype=x_mu.dtype)
413
414
    x = ((x_mu) / mu) * 2 - 1.0
    x = torch.sign(x) * (torch.exp(torch.abs(x) * torch.log1p(mu)) - 1.0) / mu
Jason Lian's avatar
more  
Jason Lian committed
415
    return x
416
417
418


def complex_norm(complex_tensor, power=1.0):
419
    # type: (Tensor, float) -> Tensor
420
    r"""Compute the norm of complex tensor input.
421
422

    Args:
423
        complex_tensor (torch.Tensor): Tensor shape of `(..., complex=2)`
424
        power (float): Power of the norm. (Default: `1.0`).
425
426

    Returns:
427
        torch.Tensor: Power of the normed input tensor. Shape of `(..., )`
428
429
430
431
432
433
434
    """
    if power == 1.0:
        return torch.norm(complex_tensor, 2, -1)
    return torch.norm(complex_tensor, 2, -1).pow(power)


def angle(complex_tensor):
435
    # type: (Tensor) -> Tensor
436
437
438
    r"""Compute the angle of complex tensor input.

    Args:
439
        complex_tensor (torch.Tensor): Tensor shape of `(..., complex=2)`
440
441

    Return:
442
        torch.Tensor: Angle of a complex tensor. Shape of `(..., )`
443
444
445
446
    """
    return torch.atan2(complex_tensor[..., 1], complex_tensor[..., 0])


447
def magphase(complex_tensor, power=1.0):
448
    # type: (Tensor, float) -> Tuple[Tensor, Tensor]
449
    r"""Separate a complex-valued spectrogram with shape `(..., 2)` into its magnitude and phase.
450
451

    Args:
452
        complex_tensor (torch.Tensor): Tensor shape of `(..., complex=2)`
453
454
455
        power (float): Power of the norm. (Default: `1.0`)

    Returns:
456
        Tuple[torch.Tensor, torch.Tensor]: The magnitude and phase of the complex tensor
457
458
459
460
461
462
463
    """
    mag = complex_norm(complex_tensor, power)
    phase = angle(complex_tensor)
    return mag, phase


def phase_vocoder(complex_specgrams, rate, phase_advance):
464
    # type: (Tensor, float, Tensor) -> Tensor
465
    r"""Given a STFT tensor, speed up in time without modifying pitch by a
466
    factor of ``rate``.
Vincent QB's avatar
Vincent QB committed
467

468
    Args:
469
        complex_specgrams (torch.Tensor): Dimension of `(..., freq, time, complex=2)`
470
        rate (float): Speed-up factor
471
472
        phase_advance (torch.Tensor): Expected phase advance in each bin. Dimension
            of (freq, 1)
Vincent QB's avatar
Vincent QB committed
473

474
    Returns:
475
        complex_specgrams_stretch (torch.Tensor): Dimension of `(...,
476
        freq, ceil(time/rate), complex=2)`
Vincent QB's avatar
Vincent QB committed
477

478
    Example
479
480
481
482
        >>> freq, hop_length = 1025, 512
        >>> # (channel, freq, time, complex=2)
        >>> complex_specgrams = torch.randn(2, freq, 300, 2)
        >>> rate = 1.3 # Speed up by 30%
483
        >>> phase_advance = torch.linspace(
484
        >>>    0, math.pi * hop_length, freq)[..., None]
485
486
        >>> x = phase_vocoder(complex_specgrams, rate, phase_advance)
        >>> x.shape # with 231 == ceil(300 / 1.3)
487
        torch.Size([2, 1025, 231, 2])
488
    """
489

490
491
492
493
    # pack batch
    shape = complex_specgrams.size()
    complex_specgrams = complex_specgrams.reshape([-1] + list(shape[-3:]))

494
495
496
497
498
    time_steps = torch.arange(0,
                              complex_specgrams.size(-2),
                              rate,
                              device=complex_specgrams.device,
                              dtype=complex_specgrams.dtype)
499

500
    alphas = time_steps % 1.0
Vincent QB's avatar
Vincent QB committed
501
    phase_0 = angle(complex_specgrams[..., :1, :])
502
503
504
505

    # Time Padding
    complex_specgrams = torch.nn.functional.pad(complex_specgrams, [0, 0, 0, 2])

506
    # (new_bins, freq, 2)
Vincent QB's avatar
Vincent QB committed
507
508
    complex_specgrams_0 = complex_specgrams.index_select(-2, time_steps.long())
    complex_specgrams_1 = complex_specgrams.index_select(-2, (time_steps + 1).long())
509
510
511
512

    angle_0 = angle(complex_specgrams_0)
    angle_1 = angle(complex_specgrams_1)

513
514
    norm_0 = torch.norm(complex_specgrams_0, p=2, dim=-1)
    norm_1 = torch.norm(complex_specgrams_1, p=2, dim=-1)
515
516
517
518
519
520

    phase = angle_1 - angle_0 - phase_advance
    phase = phase - 2 * math.pi * torch.round(phase / (2 * math.pi))

    # Compute Phase Accum
    phase = phase + phase_advance
Vincent QB's avatar
Vincent QB committed
521
    phase = torch.cat([phase_0, phase[..., :-1]], dim=-1)
522
523
524
525
526
527
528
529
530
    phase_acc = torch.cumsum(phase, -1)

    mag = alphas * norm_1 + (1 - alphas) * norm_0

    real_stretch = mag * torch.cos(phase_acc)
    imag_stretch = mag * torch.sin(phase_acc)

    complex_specgrams_stretch = torch.stack([real_stretch, imag_stretch], dim=-1)

531
532
533
    # unpack batch
    complex_specgrams_stretch = complex_specgrams_stretch.reshape(shape[:-3] + complex_specgrams_stretch.shape[1:])

534
    return complex_specgrams_stretch
535
536
537
538


def lfilter(waveform, a_coeffs, b_coeffs):
    # type: (Tensor, Tensor, Tensor) -> Tensor
Vincent QB's avatar
Vincent QB committed
539
    r"""Perform an IIR filter by evaluating difference equation.
540
541

    Args:
Vincent QB's avatar
Vincent QB committed
542
        waveform (torch.Tensor): audio waveform of dimension of `(..., time)`.  Must be normalized to -1 to 1.
543
544
545
546
547
548
549
550
        a_coeffs (torch.Tensor): denominator coefficients of difference equation of dimension of `(n_order + 1)`.
                                Lower delays coefficients are first, e.g. `[a0, a1, a2, ...]`.
                                Must be same size as b_coeffs (pad with 0's as necessary).
        b_coeffs (torch.Tensor): numerator coefficients of difference equation of dimension of `(n_order + 1)`.
                                 Lower delays coefficients are first, e.g. `[b0, b1, b2, ...]`.
                                 Must be same size as a_coeffs (pad with 0's as necessary).

    Returns:
Vincent QB's avatar
Vincent QB committed
551
        output_waveform (torch.Tensor): Dimension of `(..., time)`.  Output will be clipped to -1 to 1.
552
553
554

    """

Vincent QB's avatar
Vincent QB committed
555
556
557
558
559
560
    dim = waveform.dim()

    # pack batch
    shape = waveform.size()
    waveform = waveform.reshape(-1, shape[-1])

561
562
    assert(a_coeffs.size(0) == b_coeffs.size(0))
    assert(len(waveform.size()) == 2)
563
564
    assert(waveform.device == a_coeffs.device)
    assert(b_coeffs.device == a_coeffs.device)
565

566
567
    device = waveform.device
    dtype = waveform.dtype
Vincent QB's avatar
Vincent QB committed
568
    n_channel, n_sample = waveform.size()
569
570
571
572
    n_order = a_coeffs.size(0)
    assert(n_order > 0)

    # Pad the input and create output
Vincent QB's avatar
Vincent QB committed
573
    padded_waveform = torch.zeros(n_channel, n_sample + n_order - 1, dtype=dtype, device=device)
574
    padded_waveform[:, (n_order - 1):] = waveform
Vincent QB's avatar
Vincent QB committed
575
    padded_output_waveform = torch.zeros(n_channel, n_sample + n_order - 1, dtype=dtype, device=device)
576
577
578

    # Set up the coefficients matrix
    # Flip order, repeat, and transpose
Vincent QB's avatar
Vincent QB committed
579
580
    a_coeffs_filled = a_coeffs.flip(0).repeat(n_channel, 1).t()
    b_coeffs_filled = b_coeffs.flip(0).repeat(n_channel, 1).t()
581
582

    # Set up a few other utilities
Vincent QB's avatar
Vincent QB committed
583
584
    a0_repeated = torch.ones(n_channel, dtype=dtype, device=device) * a_coeffs[0]
    ones = torch.ones(n_channel, n_sample, dtype=dtype, device=device)
585

Vincent QB's avatar
Vincent QB committed
586
    for i_sample in range(n_sample):
587

Vincent QB's avatar
Vincent QB committed
588
        o0 = torch.zeros(n_channel, dtype=dtype, device=device)
589

Vincent QB's avatar
Vincent QB committed
590
591
        windowed_input_signal = padded_waveform[:, i_sample:(i_sample + n_order)]
        windowed_output_signal = padded_output_waveform[:, i_sample:(i_sample + n_order)]
592
593
594
595
596
597

        o0.add_(torch.diag(torch.mm(windowed_input_signal, b_coeffs_filled)))
        o0.sub_(torch.diag(torch.mm(windowed_output_signal, a_coeffs_filled)))

        o0.div_(a0_repeated)

Vincent QB's avatar
Vincent QB committed
598
        padded_output_waveform[:, i_sample + n_order - 1] = o0
599

Vincent QB's avatar
Vincent QB committed
600
601
602
603
604
605
606
607
    output = torch.min(
        ones, torch.max(ones * -1, padded_output_waveform[:, (n_order - 1):])
    )

    # unpack batch
    output = output.reshape(shape[:-1] + output.shape[-1:])

    return output
608
609
610
611


def biquad(waveform, b0, b1, b2, a0, a1, a2):
    # type: (Tensor, float, float, float, float, float, float) -> Tensor
Vincent QB's avatar
Vincent QB committed
612
    r"""Perform a biquad filter of input tensor.  Initial conditions set to 0.
613
614
615
    https://en.wikipedia.org/wiki/Digital_biquad_filter

    Args:
Vincent QB's avatar
Vincent QB committed
616
        waveform (torch.Tensor): audio waveform of dimension of `(channel, time)`
617
618
619
620
621
622
623
624
        b0 (float): numerator coefficient of current input, x[n]
        b1 (float): numerator coefficient of input one time step ago x[n-1]
        b2 (float): numerator coefficient of input two time steps ago x[n-2]
        a0 (float): denominator coefficient of current output y[n], typically 1
        a1 (float): denominator coefficient of current output y[n-1]
        a2 (float): denominator coefficient of current output y[n-2]

    Returns:
Vincent QB's avatar
Vincent QB committed
625
        output_waveform (torch.Tensor): Dimension of `(channel, time)`
626
627
    """

628
629
    device = waveform.device
    dtype = waveform.dtype
630
631

    output_waveform = lfilter(
632
633
634
        waveform,
        torch.tensor([a0, a1, a2], dtype=dtype, device=device),
        torch.tensor([b0, b1, b2], dtype=dtype, device=device)
635
636
637
638
639
    )
    return output_waveform


def _dB2Linear(x):
640
    # type: (float) -> float
641
642
643
644
    return math.exp(x * math.log(10) / 20.0)


def highpass_biquad(waveform, sample_rate, cutoff_freq, Q=0.707):
645
    # type: (Tensor, int, float, float) -> Tensor
Vincent QB's avatar
Vincent QB committed
646
    r"""Design biquad highpass filter and perform filtering.  Similar to SoX implementation.
647
648

    Args:
Vincent QB's avatar
Vincent QB committed
649
        waveform (torch.Tensor): audio waveform of dimension of `(channel, time)`
650
651
652
653
654
        sample_rate (int): sampling rate of the waveform, e.g. 44100 (Hz)
        cutoff_freq (float): filter cutoff frequency
        Q (float): https://en.wikipedia.org/wiki/Q_factor

    Returns:
Vincent QB's avatar
Vincent QB committed
655
        output_waveform (torch.Tensor): Dimension of `(channel, time)`
656
657
    """

658
    GAIN = 1.
659
660
    w0 = 2 * math.pi * cutoff_freq / sample_rate
    A = math.exp(GAIN / 40.0 * math.log(10))
661
    alpha = math.sin(w0) / 2. / Q
662
663
664
665
666
667
668
669
670
671
672
673
    mult = _dB2Linear(max(GAIN, 0))

    b0 = (1 + math.cos(w0)) / 2
    b1 = -1 - math.cos(w0)
    b2 = b0
    a0 = 1 + alpha
    a1 = -2 * math.cos(w0)
    a2 = 1 - alpha
    return biquad(waveform, b0, b1, b2, a0, a1, a2)


def lowpass_biquad(waveform, sample_rate, cutoff_freq, Q=0.707):
674
    # type: (Tensor, int, float, float) -> Tensor
Vincent QB's avatar
Vincent QB committed
675
    r"""Design biquad lowpass filter and perform filtering.  Similar to SoX implementation.
676
677

    Args:
Vincent QB's avatar
Vincent QB committed
678
        waveform (torch.Tensor): audio waveform of dimension of `(channel, time)`
679
680
681
682
683
        sample_rate (int): sampling rate of the waveform, e.g. 44100 (Hz)
        cutoff_freq (float): filter cutoff frequency
        Q (float): https://en.wikipedia.org/wiki/Q_factor

    Returns:
Vincent QB's avatar
Vincent QB committed
684
        output_waveform (torch.Tensor): Dimension of `(channel, time)`
685
686
    """

687
    GAIN = 1.
688
689
690
691
692
693
694
695
696
697
698
699
    w0 = 2 * math.pi * cutoff_freq / sample_rate
    A = math.exp(GAIN / 40.0 * math.log(10))
    alpha = math.sin(w0) / 2 / Q
    mult = _dB2Linear(max(GAIN, 0))

    b0 = (1 - math.cos(w0)) / 2
    b1 = 1 - math.cos(w0)
    b2 = b0
    a0 = 1 + alpha
    a1 = -2 * math.cos(w0)
    a2 = 1 - alpha
    return biquad(waveform, b0, b1, b2, a0, a1, a2)
Vincent QB's avatar
Vincent QB committed
700
701


xinyang0's avatar
xinyang0 committed
702
703
def equalizer_biquad(waveform, sample_rate, center_freq, gain, Q=0.707):
    # type: (Tensor, int, float, float, float) -> Tensor
Vincent QB's avatar
Vincent QB committed
704
    r"""Design biquad peaking equalizer filter and perform filtering.  Similar to SoX implementation.
xinyang0's avatar
xinyang0 committed
705
706
707
708

    Args:
        waveform (torch.Tensor): audio waveform of dimension of `(channel, time)`
        sample_rate (int): sampling rate of the waveform, e.g. 44100 (Hz)
709
        center_freq (float): filter's central frequency
xinyang0's avatar
xinyang0 committed
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
        gain (float): desired gain at the boost (or attenuation) in dB
        q_factor (float): https://en.wikipedia.org/wiki/Q_factor

    Returns:
        output_waveform (torch.Tensor): Dimension of `(channel, time)`
    """
    w0 = 2 * math.pi * center_freq / sample_rate
    A = math.exp(gain / 40.0 * math.log(10))
    alpha = math.sin(w0) / 2 / Q

    b0 = 1 + alpha * A
    b1 = -2 * math.cos(w0)
    b2 = 1 - alpha * A
    a0 = 1 + alpha / A
    a1 = -2 * math.cos(w0)
    a2 = 1 - alpha / A
    return biquad(waveform, b0, b1, b2, a0, a1, a2)


729
730
731
732
733
734
735
736
def mask_along_axis_iid(specgrams, mask_param, mask_value, axis):
    # type: (Tensor, int, float, int) -> Tensor
    r"""
    Apply a mask along ``axis``. Mask will be applied from indices ``[v_0, v_0 + v)``, where
    ``v`` is sampled from ``uniform(0, mask_param)``, and ``v_0`` from ``uniform(0, max_v - v)``.
    All examples will have the same mask interval.

    Args:
Vincent QB's avatar
Vincent QB committed
737
        specgrams (Tensor): Real spectrograms (batch, channel, freq, time)
738
739
740
741
742
        mask_param (int): Number of columns to be masked will be uniformly sampled from [0, mask_param]
        mask_value (float): Value to assign to the masked columns
        axis (int): Axis to apply masking on (2 -> frequency, 3 -> time)

    Returns:
Vincent QB's avatar
Vincent QB committed
743
        torch.Tensor: Masked spectrograms of dimensions (batch, channel, freq, time)
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
    """

    if axis != 2 and axis != 3:
        raise ValueError('Only Frequency and Time masking are supported')

    value = torch.rand(specgrams.shape[:2]) * mask_param
    min_value = torch.rand(specgrams.shape[:2]) * (specgrams.size(axis) - value)

    # Create broadcastable mask
    mask_start = (min_value.long())[..., None, None].float()
    mask_end = (min_value.long() + value.long())[..., None, None].float()
    mask = torch.arange(0, specgrams.size(axis)).float()

    # Per batch example masking
    specgrams = specgrams.transpose(axis, -1)
    specgrams.masked_fill_((mask >= mask_start) & (mask < mask_end), mask_value)
    specgrams = specgrams.transpose(axis, -1)

    return specgrams


def mask_along_axis(specgram, mask_param, mask_value, axis):
    # type: (Tensor, int, float, int) -> Tensor
    r"""
    Apply a mask along ``axis``. Mask will be applied from indices ``[v_0, v_0 + v)``, where
    ``v`` is sampled from ``uniform(0, mask_param)``, and ``v_0`` from ``uniform(0, max_v - v)``.
    All examples will have the same mask interval.

    Args:
Vincent QB's avatar
Vincent QB committed
773
        specgram (Tensor): Real spectrogram (channel, freq, time)
774
775
776
777
778
        mask_param (int): Number of columns to be masked will be uniformly sampled from [0, mask_param]
        mask_value (float): Value to assign to the masked columns
        axis (int): Axis to apply masking on (1 -> frequency, 2 -> time)

    Returns:
Vincent QB's avatar
Vincent QB committed
779
        torch.Tensor: Masked spectrogram of dimensions (channel, freq, time)
780
781
    """

782
783
784
785
    # pack batch
    shape = specgram.size()
    specgram = specgram.reshape([-1] + list(shape[-2:]))

786
787
788
789
790
791
792
793
794
795
796
797
798
799
    value = torch.rand(1) * mask_param
    min_value = torch.rand(1) * (specgram.size(axis) - value)

    mask_start = (min_value.long()).squeeze()
    mask_end = (min_value.long() + value.long()).squeeze()

    assert mask_end - mask_start < mask_param
    if axis == 1:
        specgram[:, mask_start:mask_end] = mask_value
    elif axis == 2:
        specgram[:, :, mask_start:mask_end] = mask_value
    else:
        raise ValueError('Only Frequency and Time masking are supported')

800
801
802
803
    # unpack batch
    specgram = specgram.reshape(shape[:-2] + specgram.shape[-2:])

    return specgram.reshape(shape[:-2] + specgram.shape[-2:])
804
805


Vincent QB's avatar
Vincent QB committed
806
807
808
809
810
811
812
813
814
815
816
817
def compute_deltas(specgram, win_length=5, mode="replicate"):
    # type: (Tensor, int, str) -> Tensor
    r"""Compute delta coefficients of a tensor, usually a spectrogram:

    .. math::
        d_t = \frac{\sum_{n=1}^{\text{N}} n (c_{t+n} - c_{t-n})}{2 \sum_{n=1}^{\text{N} n^2}

    where :math:`d_t` is the deltas at time :math:`t`,
    :math:`c_t` is the spectrogram coeffcients at time :math:`t`,
    :math:`N` is (`win_length`-1)//2.

    Args:
Vincent QB's avatar
Vincent QB committed
818
        specgram (torch.Tensor): Tensor of audio of dimension (..., freq, time)
Vincent QB's avatar
Vincent QB committed
819
820
821
822
        win_length (int): The window length used for computing delta
        mode (str): Mode parameter passed to padding

    Returns:
Vincent QB's avatar
Vincent QB committed
823
        deltas (torch.Tensor): Tensor of audio of dimension (..., freq, time)
Vincent QB's avatar
Vincent QB committed
824
825
826
827
828
829
830

    Example
        >>> specgram = torch.randn(1, 40, 1000)
        >>> delta = compute_deltas(specgram)
        >>> delta2 = compute_deltas(delta)
    """

Vincent QB's avatar
Vincent QB committed
831
832
833
834
    # pack batch
    shape = specgram.size()
    specgram = specgram.reshape(1, -1, shape[-1])

Vincent QB's avatar
Vincent QB committed
835
836
837
838
839
840
841
842
843
844
845
846
    assert win_length >= 3

    n = (win_length - 1) // 2

    # twice sum of integer squared
    denom = n * (n + 1) * (2 * n + 1) / 3

    specgram = torch.nn.functional.pad(specgram, (n, n), mode=mode)

    kernel = (
        torch
        .arange(-n, n + 1, 1, device=specgram.device, dtype=specgram.dtype)
Vincent QB's avatar
Vincent QB committed
847
        .repeat(specgram.shape[1], 1, 1)
Vincent QB's avatar
Vincent QB committed
848
849
    )

Vincent QB's avatar
Vincent QB committed
850
851
852
853
854
855
    output = torch.nn.functional.conv1d(specgram, kernel, groups=specgram.shape[1]) / denom

    # unpack batch
    output = output.reshape(shape)

    return output
Vincent QB's avatar
Vincent QB committed
856
857


858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
def gain(waveform, gain_db=1.0):
    # type: (Tensor, float) -> Tensor
    r"""Apply amplification or attenuation to the whole waveform.

    Args:
       waveform (torch.Tensor): Tensor of audio of dimension (channel, time).
       gain_db (float) Gain adjustment in decibels (dB) (Default: `1.0`).

    Returns:
       torch.Tensor: the whole waveform amplified by gain_db.
    """
    if (gain_db == 0):
        return waveform

    ratio = 10 ** (gain_db / 20)

    return waveform * ratio


def _add_noise_shaping(dithered_waveform, waveform):
    r"""Noise shaping is calculated by error:
    error[n] = dithered[n] - original[n]
    noise_shaped_waveform[n] = dithered[n] + error[n-1]
    """
    wf_shape = waveform.size()
    waveform = waveform.reshape(-1, wf_shape[-1])

    dithered_shape = dithered_waveform.size()
    dithered_waveform = dithered_waveform.reshape(-1, dithered_shape[-1])

    error = dithered_waveform - waveform

    # add error[n-1] to dithered_waveform[n], so offset the error by 1 index
    for index in range(error.size()[0]):
        err = error[index]
        error_offset = torch.cat((torch.zeros(1), err))
        error[index] = error_offset[:waveform.size()[1]]

    noise_shaped = dithered_waveform + error
    return noise_shaped.reshape(dithered_shape[:-1] + noise_shaped.shape[-1:])


def _apply_probability_distribution(waveform, density_function="TPDF"):
    # type: (Tensor, str) -> Tensor
    r"""Apply a probability distribution function on a waveform.

    Triangular probability density function (TPDF) dither noise has a
    triangular distribution; values in the center of the range have a higher
    probability of occurring.

    Rectangular probability density function (RPDF) dither noise has a
    uniform distribution; any value in the specified range has the same
    probability of occurring.

    Gaussian probability density function (GPDF) has a normal distribution.
    The relationship of probabilities of results follows a bell-shaped,
    or Gaussian curve, typical of dither generated by analog sources.
    Args:
        waveform (torch.Tensor): Tensor of audio of dimension (channel, time)
        probability_density_function (string): The density function of a
           continuous random variable (Default: `TPDF`)
           Options: Triangular Probability Density Function - `TPDF`
                    Rectangular Probability Density Function - `RPDF`
                    Gaussian Probability Density Function - `GPDF`
    Returns:
        torch.Tensor: waveform dithered with TPDF
    """
    shape = waveform.size()
    waveform = waveform.reshape(-1, shape[-1])

    channel_size = waveform.size()[0] - 1
    time_size = waveform.size()[-1] - 1

    random_channel = int(torch.randint(channel_size, [1, ]).item()) if channel_size > 0 else 0
    random_time = int(torch.randint(time_size, [1, ]).item()) if time_size > 0 else 0

    number_of_bits = 16
    up_scaling = 2 ** (number_of_bits - 1) - 2
    signal_scaled = waveform * up_scaling
    down_scaling = 2 ** (number_of_bits - 1)

    signal_scaled_dis = waveform
    if (density_function == "RPDF"):
        RPDF = waveform[random_channel][random_time] - 0.5

        signal_scaled_dis = signal_scaled + RPDF
    elif (density_function == "GPDF"):
        # TODO Replace by distribution code once
        # https://github.com/pytorch/pytorch/issues/29843 is resolved
        # gaussian = torch.distributions.normal.Normal(torch.mean(waveform, -1), 1).sample()

        num_rand_variables = 6

        gaussian = waveform[random_channel][random_time]
        for ws in num_rand_variables * [time_size]:
            rand_chan = int(torch.randint(channel_size, [1, ]).item())
            gaussian += waveform[rand_chan][int(torch.randint(ws, [1, ]).item())]

        signal_scaled_dis = signal_scaled + gaussian
    else:
        TPDF = torch.bartlett_window(time_size + 1)
        TPDF = TPDF.repeat((channel_size + 1), 1)
        signal_scaled_dis = signal_scaled + TPDF

    quantised_signal_scaled = torch.round(signal_scaled_dis)
    quantised_signal = quantised_signal_scaled / down_scaling
    return quantised_signal.reshape(shape[:-1] + quantised_signal.shape[-1:])


def dither(waveform, density_function="TPDF", noise_shaping=False):
    # type: (Tensor, str, bool) -> Tensor
    r"""Dither increases the perceived dynamic range of audio stored at a
    particular bit-depth by eliminating nonlinear truncation distortion
    (i.e. adding minimally perceived noise to mask distortion caused by quantization).
    Args:
       waveform (torch.Tensor): Tensor of audio of dimension (channel, time)
       density_function (string): The density function of a
           continuous random variable (Default: `TPDF`)
           Options: Triangular Probability Density Function - `TPDF`
                    Rectangular Probability Density Function - `RPDF`
                    Gaussian Probability Density Function - `GPDF`
       noise_shaping (boolean): a filtering process that shapes the spectral
           energy of quantisation error (Default: `False`)

    Returns:
       torch.Tensor: waveform dithered
    """
    dithered = _apply_probability_distribution(waveform, density_function=density_function)

    if noise_shaping:
        return _add_noise_shaping(dithered, waveform)
    else:
        return dithered


Vincent QB's avatar
Vincent QB committed
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
def _compute_nccf(waveform, sample_rate, frame_time, freq_low):
    # type: (Tensor, int, float, int) -> Tensor
    r"""
    Compute Normalized Cross-Correlation Function (NCCF).

    .. math::
        \phi_i(m) = \frac{\sum_{n=b_i}^{b_i + N-1} w(n) w(m+n)}{\sqrt{E(b_i) E(m+b_i)}},

    where
    :math:`\phi_i(m)` is the NCCF at frame :math:`i` with lag :math:`m`,
    :math:`w` is the waveform,
    :math:`N` is the lenght of a frame,
    :math:`b_i` is the beginning of frame :math:`i`,
    :math:`E(j)` is the energy :math:`\sum_{n=j}^{j+N-1} w^2(n)`.
    """

    EPSILON = 10 ** (-9)

    # Number of lags to check
    lags = math.ceil(sample_rate / freq_low)

    frame_size = int(math.ceil(sample_rate * frame_time))

    waveform_length = waveform.size()[-1]
    num_of_frames = math.ceil(waveform_length / frame_size)

    p = lags + num_of_frames * frame_size - waveform_length
    waveform = torch.nn.functional.pad(waveform, (0, p))

    # Compute lags
    output_lag = []
    for lag in range(1, lags + 1):
        s1 = waveform[..., :-lag].unfold(-1, frame_size, frame_size)[
            ..., :num_of_frames, :
        ]
        s2 = waveform[..., lag:].unfold(-1, frame_size, frame_size)[
            ..., :num_of_frames, :
        ]

        output_frames = (
            (s1 * s2).sum(-1)
            / (EPSILON + torch.norm(s1, p=2, dim=-1)).pow(2)
            / (EPSILON + torch.norm(s2, p=2, dim=-1)).pow(2)
        )

        output_lag.append(output_frames.unsqueeze(-1))

    nccf = torch.cat(output_lag, -1)

    return nccf


def _combine_max(a, b, thresh=0.99):
    # type: (Tuple[Tensor, Tensor], Tuple[Tensor, Tensor], float) -> Tuple[Tensor, Tensor]
    """
    Take value from first if bigger than a multiplicative factor of the second, elementwise.
    """
    mask = (a[0] > thresh * b[0])
    values = mask * a[0] + ~mask * b[0]
    indices = mask * a[1] + ~mask * b[1]
    return values, indices


def _find_max_per_frame(nccf, sample_rate, freq_high):
    # type: (Tensor, int, int) -> Tensor
    r"""
    For each frame, take the highest value of NCCF,
    apply centered median smoothing, and convert to frequency.

    Note: If the max among all the lags is very close
    to the first half of lags, then the latter is taken.
    """

    lag_min = math.ceil(sample_rate / freq_high)

    # Find near enough max that is smallest

    best = torch.max(nccf[..., lag_min:], -1)

    half_size = nccf.shape[-1] // 2
    half = torch.max(nccf[..., lag_min:half_size], -1)

    best = _combine_max(half, best)
    indices = best[1]

    # Add back minimal lag
    indices += lag_min
    # Add 1 empirical calibration offset
    indices += 1

    return indices


def _median_smoothing(indices, win_length):
    # type: (Tensor, int) -> Tensor
    r"""
    Apply median smoothing to the 1D tensor over the given window.
    """

    # Centered windowed
    pad_length = (win_length - 1) // 2

    # "replicate" padding in any dimension
    indices = torch.nn.functional.pad(
        indices, (pad_length, 0), mode="constant", value=0.
    )

    indices[..., :pad_length] = torch.cat(pad_length * [indices[..., pad_length].unsqueeze(-1)], dim=-1)
    roll = indices.unfold(-1, win_length, 1)

    values, _ = torch.median(roll, -1)
    return values


def detect_pitch_frequency(
    waveform,
    sample_rate,
    frame_time=10 ** (-2),
    win_length=30,
    freq_low=85,
    freq_high=3400,
):
    # type: (Tensor, int, float, int, int, int) -> Tensor
    r"""Detect pitch frequency.

    It is implemented using normalized cross-correlation function and median smoothing.

    Args:
Vincent QB's avatar
Vincent QB committed
1121
        waveform (torch.Tensor): Tensor of audio of dimension (..., freq, time)
Vincent QB's avatar
Vincent QB committed
1122
1123
1124
1125
1126
1127
        sample_rate (int): The sample rate of the waveform (Hz)
        win_length (int): The window length for median smoothing (in number of frames)
        freq_low (int): Lowest frequency that can be detected (Hz)
        freq_high (int): Highest frequency that can be detected (Hz)

    Returns:
Vincent QB's avatar
Vincent QB committed
1128
        freq (torch.Tensor): Tensor of audio of dimension (..., frame)
Vincent QB's avatar
Vincent QB committed
1129
1130
    """

Vincent QB's avatar
Vincent QB committed
1131
1132
1133
    dim = waveform.dim()

    # pack batch
1134
    shape = list(waveform.size())
Vincent QB's avatar
Vincent QB committed
1135
1136
    waveform = waveform.reshape([-1] + shape[-1:])

Vincent QB's avatar
Vincent QB committed
1137
1138
1139
1140
1141
1142
1143
1144
    nccf = _compute_nccf(waveform, sample_rate, frame_time, freq_low)
    indices = _find_max_per_frame(nccf, sample_rate, freq_high)
    indices = _median_smoothing(indices, win_length)

    # Convert indices to frequency
    EPSILON = 10 ** (-9)
    freq = sample_rate / (EPSILON + indices.to(torch.float))

Vincent QB's avatar
Vincent QB committed
1145
    # unpack batch
1146
    freq = freq.reshape(shape[:-1] + list(freq.shape[-1:]))
Vincent QB's avatar
Vincent QB committed
1147

Vincent QB's avatar
Vincent QB committed
1148
    return freq