functional.py 34.5 KB
Newer Older
1
from __future__ import absolute_import, division, print_function, unicode_literals
Vincent QB's avatar
Vincent QB committed
2

3
import math
Vincent QB's avatar
Vincent QB committed
4

Jason Lian's avatar
Jason Lian committed
5
6
import torch

Jason Lian's avatar
pre  
Jason Lian committed
7
__all__ = [
8
9
10
11
12
13
14
15
16
17
18
19
20
21
    "istft",
    "spectrogram",
    "amplitude_to_DB",
    "create_fb_matrix",
    "create_dct",
    "mu_law_encoding",
    "mu_law_decoding",
    "complex_norm",
    "angle",
    "magphase",
    "phase_vocoder",
    "lfilter",
    "lowpass_biquad",
    "highpass_biquad",
xinyang0's avatar
xinyang0 committed
22
    "equalizer_biquad",
23
    "biquad",
24
25
    'mask_along_axis',
    'mask_along_axis_iid'
Jason Lian's avatar
pre  
Jason Lian committed
26
27
]

Vincent QB's avatar
Vincent QB committed
28

Jason Lian's avatar
Jason Lian committed
29
# TODO: remove this once https://github.com/pytorch/pytorch/issues/21478 gets solved
Jason Lian's avatar
more  
Jason Lian committed
30
@torch.jit.ignore
31
32
33
34
35
36
37
38
39
40
41
def _stft(
    waveform,
    n_fft,
    hop_length,
    win_length,
    window,
    center,
    pad_mode,
    normalized,
    onesided,
):
42
    # type: (Tensor, int, Optional[int], Optional[int], Optional[Tensor], bool, str, bool, bool) -> Tensor
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
    return torch.stft(
        waveform,
        n_fft,
        hop_length,
        win_length,
        window,
        center,
        pad_mode,
        normalized,
        onesided,
    )


def istft(
    stft_matrix,  # type: Tensor
    n_fft,  # type: int
    hop_length=None,  # type: Optional[int]
    win_length=None,  # type: Optional[int]
    window=None,  # type: Optional[Tensor]
    center=True,  # type: bool
    pad_mode="reflect",  # type: str
    normalized=False,  # type: bool
    onesided=True,  # type: bool
    length=None,  # type: Optional[int]
):
jamarshon's avatar
jamarshon committed
68
    # type: (...) -> Tensor
69
    r"""Inverse short time Fourier Transform. This is expected to be the inverse of torch.stft.
jamarshon's avatar
jamarshon committed
70
    It has the same parameters (+ additional optional parameter of ``length``) and it should return the
jamarshon's avatar
jamarshon committed
71
72
    least squares estimation of the original signal. The algorithm will check using the NOLA condition (
    nonzero overlap).
jamarshon's avatar
jamarshon committed
73
74

    Important consideration in the parameters ``window`` and ``center`` so that the envelop
jamarshon's avatar
jamarshon committed
75
    created by the summation of all the windows is never zero at certain point in time. Specifically,
jamarshon's avatar
jamarshon committed
76
77
    :math:`\sum_{t=-\infty}^{\infty} w^2[n-t\times hop\_length] \cancel{=} 0`.

jamarshon's avatar
jamarshon committed
78
    Since stft discards elements at the end of the signal if they do not fit in a frame, the
79
    istft may return a shorter signal than the original signal (can occur if ``center`` is False
jamarshon's avatar
jamarshon committed
80
    since the signal isn't padded).
jamarshon's avatar
jamarshon committed
81
82

    If ``center`` is True, then there will be padding e.g. 'constant', 'reflect', etc. Left padding
jamarshon's avatar
jamarshon committed
83
84
    can be trimmed off exactly because they can be calculated but right padding cannot be calculated
    without additional information.
jamarshon's avatar
jamarshon committed
85

jamarshon's avatar
jamarshon committed
86
87
    Example: Suppose the last window is:
    [17, 18, 0, 0, 0] vs [18, 0, 0, 0, 0]
jamarshon's avatar
jamarshon committed
88

Vincent QB's avatar
Vincent QB committed
89
    The n_frame, hop_length, win_length are all the same which prevents the calculation of right padding.
jamarshon's avatar
jamarshon committed
90
91
92
93
    These additional values could be zeros or a reflection of the signal so providing ``length``
    could be useful. If ``length`` is ``None`` then padding will be aggressively removed
    (some loss of signal).

94
95
    [1] D. W. Griffin and J. S. Lim, "Signal estimation from modified short-time Fourier transform,"
    IEEE Trans. ASSP, vol.32, no.2, pp.236-243, Apr. 1984.
jamarshon's avatar
jamarshon committed
96
97

    Args:
98
        stft_matrix (torch.Tensor): Output of stft where each row of a channel is a frequency and each
Vincent QB's avatar
Vincent QB committed
99
100
            column is a window. it has a size of either (channel, fft_size, n_frame, 2) or (
            fft_size, n_frame, 2)
jamarshon's avatar
jamarshon committed
101
102
103
104
105
106
107
        n_fft (int): Size of Fourier transform
        hop_length (Optional[int]): The distance between neighboring sliding window frames.
            (Default: ``win_length // 4``)
        win_length (Optional[int]): The size of window frame and STFT filter. (Default: ``n_fft``)
        window (Optional[torch.Tensor]): The optional window function.
            (Default: ``torch.ones(win_length)``)
        center (bool): Whether ``input`` was padded on both sides so
108
109
110
111
112
113
            that the :math:`t`-th frame is centered at time :math:`t \times \text{hop\_length}`.
            (Default: ``True``)
        pad_mode (str): Controls the padding method used when ``center`` is True. (Default:
            ``'reflect'``)
        normalized (bool): Whether the STFT was normalized. (Default: ``False``)
        onesided (bool): Whether the STFT is onesided. (Default: ``True``)
jamarshon's avatar
jamarshon committed
114
        length (Optional[int]): The amount to trim the signal by (i.e. the
jamarshon's avatar
jamarshon committed
115
            original signal length). (Default: whole signal)
jamarshon's avatar
jamarshon committed
116
117
118

    Returns:
        torch.Tensor: Least squares estimation of the original signal of size
119
        (channel, signal_length) or (signal_length)
jamarshon's avatar
jamarshon committed
120
121
    """
    stft_matrix_dim = stft_matrix.dim()
122
    assert 3 <= stft_matrix_dim <= 4, "Incorrect stft dimension: %d" % (stft_matrix_dim)
jamarshon's avatar
jamarshon committed
123
124

    if stft_matrix_dim == 3:
125
        # add a channel dimension
jamarshon's avatar
jamarshon committed
126
127
        stft_matrix = stft_matrix.unsqueeze(0)

128
    dtype = stft_matrix.dtype
jamarshon's avatar
jamarshon committed
129
130
    device = stft_matrix.device
    fft_size = stft_matrix.size(1)
131
132
133
134
135
136
137
    assert (onesided and n_fft // 2 + 1 == fft_size) or (
        not onesided and n_fft == fft_size
    ), (
        "one_sided implies that n_fft // 2 + 1 == fft_size and not one_sided implies n_fft == fft_size. "
        + "Given values were onesided: %s, n_fft: %d, fft_size: %d"
        % ("True" if onesided else False, n_fft, fft_size)
    )
jamarshon's avatar
jamarshon committed
138
139
140
141
142
143
144
145
146
147
148
149
150

    # use stft defaults for Optionals
    if win_length is None:
        win_length = n_fft

    if hop_length is None:
        hop_length = int(win_length // 4)

    # There must be overlap
    assert 0 < hop_length <= win_length
    assert 0 < win_length <= n_fft

    if window is None:
151
        window = torch.ones(win_length, requires_grad=False, device=device, dtype=dtype)
jamarshon's avatar
jamarshon committed
152
153
154
155
156
157
158
159
160
161

    assert window.dim() == 1 and window.size(0) == win_length

    if win_length != n_fft:
        # center window with pad left and right zeros
        left = (n_fft - win_length) // 2
        window = torch.nn.functional.pad(window, (left, n_fft - win_length - left))
        assert window.size(0) == n_fft
    # win_length and n_fft are synonymous from here on

Vincent QB's avatar
Vincent QB committed
162
    stft_matrix = stft_matrix.transpose(1, 2)  # size (channel, n_frame, fft_size, 2)
163
164
    stft_matrix = torch.irfft(
        stft_matrix, 1, normalized, onesided, signal_sizes=(n_fft,)
Vincent QB's avatar
Vincent QB committed
165
    )  # size (channel, n_frame, n_fft)
jamarshon's avatar
jamarshon committed
166
167

    assert stft_matrix.size(2) == n_fft
Vincent QB's avatar
Vincent QB committed
168
    n_frame = stft_matrix.size(1)
jamarshon's avatar
jamarshon committed
169

Vincent QB's avatar
Vincent QB committed
170
    ytmp = stft_matrix * window.view(1, 1, n_fft)  # size (channel, n_frame, n_fft)
171
    # each column of a channel is a frame which needs to be overlap added at the right place
Vincent QB's avatar
Vincent QB committed
172
    ytmp = ytmp.transpose(1, 2)  # size (channel, n_fft, n_frame)
jamarshon's avatar
jamarshon committed
173

174
175
176
    eye = torch.eye(n_fft, requires_grad=False, device=device, dtype=dtype).unsqueeze(
        1
    )  # size (n_fft, 1, n_fft)
jamarshon's avatar
jamarshon committed
177
178
179
180

    # this does overlap add where the frames of ytmp are added such that the i'th frame of
    # ytmp is added starting at i*hop_length in the output
    y = torch.nn.functional.conv_transpose1d(
181
182
        ytmp, eye, stride=hop_length, padding=0
    )  # size (channel, 1, expected_signal_len)
jamarshon's avatar
jamarshon committed
183
184

    # do the same for the window function
185
    window_sq = (
Vincent QB's avatar
Vincent QB committed
186
187
        window.pow(2).view(n_fft, 1).repeat((1, n_frame)).unsqueeze(0)
    )  # size (1, n_fft, n_frame)
jamarshon's avatar
jamarshon committed
188
    window_envelop = torch.nn.functional.conv_transpose1d(
189
190
        window_sq, eye, stride=hop_length, padding=0
    )  # size (1, 1, expected_signal_len)
jamarshon's avatar
jamarshon committed
191

Vincent QB's avatar
Vincent QB committed
192
    expected_signal_len = n_fft + hop_length * (n_frame - 1)
jamarshon's avatar
jamarshon committed
193
194
195
196
197
198
199
200
201
202
203
204
205
    assert y.size(2) == expected_signal_len
    assert window_envelop.size(2) == expected_signal_len

    half_n_fft = n_fft // 2
    # we need to trim the front padding away if center
    start = half_n_fft if center else 0
    end = -half_n_fft if length is None else start + length

    y = y[:, :, start:end]
    window_envelop = window_envelop[:, :, start:end]

    # check NOLA non-zero overlap condition
    window_envelop_lowest = window_envelop.abs().min()
206
207
208
    assert window_envelop_lowest > 1e-11, "window overlap add min: %f" % (
        window_envelop_lowest
    )
jamarshon's avatar
jamarshon committed
209

210
    y = (y / window_envelop).squeeze(1)  # size (channel, expected_signal_len)
jamarshon's avatar
jamarshon committed
211

212
    if stft_matrix_dim == 3:  # remove the channel dimension
jamarshon's avatar
jamarshon committed
213
214
215
216
        y = y.squeeze(0)
    return y


217
@torch.jit.script
218
219
220
def spectrogram(
    waveform, pad, window, n_fft, hop_length, win_length, power, normalized
):
Jason Lian's avatar
Jason Lian committed
221
    # type: (Tensor, int, Tensor, int, int, int, int, bool) -> Tensor
222
223
224
225
    r"""
    spectrogram(waveform, pad, window, n_fft, hop_length, win_length, power, normalized)

    Create a spectrogram from a raw audio signal.
jamarshon's avatar
jamarshon committed
226
227

    Args:
228
        waveform (torch.Tensor): Tensor of audio of dimension (channel, time)
jamarshon's avatar
jamarshon committed
229
        pad (int): Two sided padding of signal
230
        window (torch.Tensor): Window tensor that is applied/multiplied to each frame/window
231
        n_fft (int): Size of FFT
232
233
234
        hop_length (int): Length of hop between STFT windows
        win_length (int): Window size
        power (int): Exponent for the magnitude spectrogram,
jamarshon's avatar
jamarshon committed
235
            (must be > 0) e.g., 1 for energy, 2 for power, etc.
236
        normalized (bool): Whether to normalize by magnitude after stft
jamarshon's avatar
jamarshon committed
237
238

    Returns:
Vincent QB's avatar
Vincent QB committed
239
240
241
        torch.Tensor: Dimension (channel, freq, time), where channel
        is unchanged, freq is ``n_fft // 2 + 1`` and ``n_fft`` is the number of
        Fourier bins, and time is the number of window hops (n_frame).
Jason Lian's avatar
Jason Lian committed
242
    """
243
    assert waveform.dim() == 2
Jason Lian's avatar
Jason Lian committed
244
245

    if pad > 0:
246
        # TODO add "with torch.no_grad():" back when JIT supports it
247
        waveform = torch.nn.functional.pad(waveform, (pad, pad), "constant")
Jason Lian's avatar
Jason Lian committed
248
249

    # default values are consistent with librosa.core.spectrum._spectrogram
250
251
252
    spec_f = _stft(
        waveform, n_fft, hop_length, win_length, window, True, "reflect", False, True
    )
253

254
    if normalized:
Jason Lian's avatar
Jason Lian committed
255
        spec_f /= window.pow(2).sum().sqrt()
256
    spec_f = spec_f.pow(power).sum(-1)  # get power of "complex" tensor
Jason Lian's avatar
Jason Lian committed
257
    return spec_f
Jason Lian's avatar
more  
Jason Lian committed
258
259


260
@torch.jit.script
261
def amplitude_to_DB(x, multiplier, amin, db_multiplier, top_db=None):
262
    # type: (Tensor, float, float, float, Optional[float]) -> Tensor
263
264
265
266
    r"""
    amplitude_to_DB(x, multiplier, amin, db_multiplier, top_db=None)

    Turns a tensor from the power/amplitude scale to the decibel scale.
267

268
    This output depends on the maximum value in the input tensor, and so
269
270
271
272
    may return different values for an audio clip split into snippets vs. a
    a full clip.

    Args:
273
        x (torch.Tensor): Input tensor before being converted to decibel scale
274
        multiplier (float): Use 10. for power and 20. for amplitude
275
        amin (float): Number to clamp ``x``
276
277
        db_multiplier (float): Log10(max(reference value and amin))
        top_db (Optional[float]): Minimum negative cut-off in decibels. A reasonable number
278
            is 80. (Default: ``None``)
279
280

    Returns:
281
        torch.Tensor: Output tensor in decibel scale
282
    """
283
284
    x_db = multiplier * torch.log10(torch.clamp(x, min=amin))
    x_db -= multiplier * db_multiplier
285
286

    if top_db is not None:
287
288
289
        new_x_db_max = torch.tensor(
            float(x_db.max()) - top_db, dtype=x_db.dtype, device=x_db.device
        )
290
        x_db = torch.max(x_db, new_x_db_max)
291

292
    return x_db
293
294


295
@torch.jit.script
engineerchuan's avatar
engineerchuan committed
296
297
def create_fb_matrix(n_freqs, f_min, f_max, n_mels, sample_rate):
    # type: (int, float, float, int, int) -> Tensor
298
    r"""
engineerchuan's avatar
engineerchuan committed
299
    create_fb_matrix(n_freqs, f_min, f_max, n_mels, sample_rate)
300
301

    Create a frequency bin conversion matrix.
Jason Lian's avatar
more  
Jason Lian committed
302

jamarshon's avatar
jamarshon committed
303
    Args:
304
        n_freqs (int): Number of frequencies to highlight/apply
engineerchuan's avatar
engineerchuan committed
305
306
        f_min (float): Minimum frequency (Hz)
        f_max (float): Maximum frequency (Hz)
307
        n_mels (int): Number of mel filterbanks
engineerchuan's avatar
engineerchuan committed
308
        sample_rate (int): Sample rate of the audio waveform
Jason Lian's avatar
more  
Jason Lian committed
309

jamarshon's avatar
jamarshon committed
310
    Returns:
311
        torch.Tensor: Triangular filter banks (fb matrix) of size (``n_freqs``, ``n_mels``)
312
313
        meaning number of frequencies to highlight/apply to x the number of filterbanks.
        Each column is a filterbank so that assuming there is a matrix A of
314
315
        size (..., ``n_freqs``), the applied result would be
        ``A * create_fb_matrix(A.size(-1), ...)``.
316
    """
317
    # freq bins
engineerchuan's avatar
engineerchuan committed
318
319
320
321
322
    # Equivalent filterbank construction by Librosa
    all_freqs = torch.linspace(0, sample_rate // 2, n_freqs)
    i_freqs = all_freqs.ge(f_min) & all_freqs.le(f_max)
    freqs = all_freqs[i_freqs]

Jason Lian's avatar
more  
Jason Lian committed
323
    # calculate mel freq bins
324
    # hertz to mel(f) is 2595. * math.log10(1. + (f / 700.))
engineerchuan's avatar
engineerchuan committed
325
    m_min = 2595.0 * math.log10(1.0 + (f_min / 700.0))
326
    m_max = 2595.0 * math.log10(1.0 + (f_max / 700.0))
Jason Lian's avatar
more  
Jason Lian committed
327
    m_pts = torch.linspace(m_min, m_max, n_mels + 2)
328
    # mel to hertz(mel) is 700. * (10**(mel / 2595.) - 1.)
329
    f_pts = 700.0 * (10 ** (m_pts / 2595.0) - 1.0)
Jason Lian's avatar
more  
Jason Lian committed
330
331
    # calculate the difference between each mel point and each stft freq point in hertz
    f_diff = f_pts[1:] - f_pts[:-1]  # (n_mels + 1)
engineerchuan's avatar
engineerchuan committed
332
    slopes = f_pts.unsqueeze(0) - all_freqs.unsqueeze(1)  # (n_freqs, n_mels + 2)
Jason Lian's avatar
more  
Jason Lian committed
333
    # create overlapping triangles
334
    zero = torch.zeros(1)
335
    down_slopes = (-1.0 * slopes[:, :-2]) / f_diff[:-1]  # (n_freqs, n_mels)
336
337
    up_slopes = slopes[:, 2:] / f_diff[1:]  # (n_freqs, n_mels)
    fb = torch.max(zero, torch.min(down_slopes, up_slopes))
Jason Lian's avatar
more  
Jason Lian committed
338
339
340
    return fb


341
@torch.jit.script
Jason Lian's avatar
more  
Jason Lian committed
342
def create_dct(n_mfcc, n_mels, norm):
343
    # type: (int, int, Optional[str]) -> Tensor
344
345
346
347
    r"""
    create_dct(n_mfcc, n_mels, norm)

    Creates a DCT transformation matrix with shape (``n_mels``, ``n_mfcc``),
jamarshon's avatar
jamarshon committed
348
    normalized depending on norm.
Jason Lian's avatar
Jason Lian committed
349

jamarshon's avatar
jamarshon committed
350
    Args:
351
352
353
        n_mfcc (int): Number of mfc coefficients to retain
        n_mels (int): Number of mel filterbanks
        norm (Optional[str]): Norm to use (either 'ortho' or None)
Jason Lian's avatar
Jason Lian committed
354

jamarshon's avatar
jamarshon committed
355
    Returns:
356
        torch.Tensor: The transformation matrix, to be right-multiplied to
357
        row-wise data of size (``n_mels``, ``n_mfcc``).
Jason Lian's avatar
more  
Jason Lian committed
358
359
    """
    # http://en.wikipedia.org/wiki/Discrete_cosine_transform#DCT-II
360
361
362
    n = torch.arange(float(n_mels))
    k = torch.arange(float(n_mfcc)).unsqueeze(1)
    dct = torch.cos(math.pi / float(n_mels) * (n + 0.5) * k)  # size (n_mfcc, n_mels)
363
364
    if norm is None:
        dct *= 2.0
Jason Lian's avatar
more  
Jason Lian committed
365
    else:
366
        assert norm == "ortho"
367
        dct[0] *= 1.0 / math.sqrt(2.0)
368
        dct *= math.sqrt(2.0 / float(n_mels))
369
    return dct.t()
Jason Lian's avatar
more  
Jason Lian committed
370
371


372
@torch.jit.script
373
def mu_law_encoding(x, quantization_channels):
374
    # type: (Tensor, int) -> Tensor
375
376
377
378
    r"""
    mu_law_encoding(x, quantization_channels)

    Encode signal based on mu-law companding.  For more info see the
Jason Lian's avatar
Jason Lian committed
379
380
381
    `Wikipedia Entry <https://en.wikipedia.org/wiki/%CE%9C-law_algorithm>`_

    This algorithm assumes the signal has been scaled to between -1 and 1 and
jamarshon's avatar
jamarshon committed
382
    returns a signal encoded with values from 0 to quantization_channels - 1.
Jason Lian's avatar
Jason Lian committed
383

jamarshon's avatar
jamarshon committed
384
385
    Args:
        x (torch.Tensor): Input tensor
386
        quantization_channels (int): Number of channels
Jason Lian's avatar
Jason Lian committed
387

jamarshon's avatar
jamarshon committed
388
    Returns:
389
        torch.Tensor: Input after mu-law encoding
Jason Lian's avatar
Jason Lian committed
390
    """
391
    mu = quantization_channels - 1.0
392
    if not x.is_floating_point():
393
394
        x = x.to(torch.float)
    mu = torch.tensor(mu, dtype=x.dtype)
395
    x_mu = torch.sign(x) * torch.log1p(mu * torch.abs(x)) / torch.log1p(mu)
Jason Lian's avatar
Jason Lian committed
396
    x_mu = ((x_mu + 1) / 2 * mu + 0.5).to(torch.int64)
Jason Lian's avatar
more  
Jason Lian committed
397
398
399
    return x_mu


400
@torch.jit.script
401
def mu_law_decoding(x_mu, quantization_channels):
402
    # type: (Tensor, int) -> Tensor
403
404
405
406
    r"""
    mu_law_decoding(x_mu, quantization_channels)

    Decode mu-law encoded signal.  For more info see the
Jason Lian's avatar
Jason Lian committed
407
408
409
410
411
    `Wikipedia Entry <https://en.wikipedia.org/wiki/%CE%9C-law_algorithm>`_

    This expects an input with values between 0 and quantization_channels - 1
    and returns a signal scaled between -1 and 1.

jamarshon's avatar
jamarshon committed
412
413
    Args:
        x_mu (torch.Tensor): Input tensor
414
        quantization_channels (int): Number of channels
Jason Lian's avatar
Jason Lian committed
415

jamarshon's avatar
jamarshon committed
416
    Returns:
417
        torch.Tensor: Input after mu-law decoding
Jason Lian's avatar
Jason Lian committed
418
    """
419
    mu = quantization_channels - 1.0
420
    if not x_mu.is_floating_point():
421
422
        x_mu = x_mu.to(torch.float)
    mu = torch.tensor(mu, dtype=x_mu.dtype)
423
424
    x = ((x_mu) / mu) * 2 - 1.0
    x = torch.sign(x) * (torch.exp(torch.abs(x) * torch.log1p(mu)) - 1.0) / mu
Jason Lian's avatar
more  
Jason Lian committed
425
    return x
426
427


428
@torch.jit.script
429
def complex_norm(complex_tensor, power=1.0):
430
    # type: (Tensor, float) -> Tensor
431
    r"""Compute the norm of complex tensor input.
432
433

    Args:
434
435
        complex_tensor (torch.Tensor): Tensor shape of `(*, complex=2)`
        power (float): Power of the norm. (Default: `1.0`).
436
437

    Returns:
438
        torch.Tensor: Power of the normed input tensor. Shape of `(*, )`
439
440
441
442
443
444
    """
    if power == 1.0:
        return torch.norm(complex_tensor, 2, -1)
    return torch.norm(complex_tensor, 2, -1).pow(power)


445
@torch.jit.script
446
def angle(complex_tensor):
447
    # type: (Tensor) -> Tensor
448
449
450
451
452
453
454
    r"""Compute the angle of complex tensor input.

    Args:
        complex_tensor (torch.Tensor): Tensor shape of `(*, complex=2)`

    Return:
        torch.Tensor: Angle of a complex tensor. Shape of `(*, )`
455
456
457
458
    """
    return torch.atan2(complex_tensor[..., 1], complex_tensor[..., 0])


459
@torch.jit.script
460
def magphase(complex_tensor, power=1.0):
461
    # type: (Tensor, float) -> Tuple[Tensor, Tensor]
462
    r"""Separate a complex-valued spectrogram with shape `(*, 2)` into its magnitude and phase.
463
464
465
466
467
468

    Args:
        complex_tensor (torch.Tensor): Tensor shape of `(*, complex=2)`
        power (float): Power of the norm. (Default: `1.0`)

    Returns:
469
        Tuple[torch.Tensor, torch.Tensor]: The magnitude and phase of the complex tensor
470
471
472
473
474
475
    """
    mag = complex_norm(complex_tensor, power)
    phase = angle(complex_tensor)
    return mag, phase


476
@torch.jit.script
477
def phase_vocoder(complex_specgrams, rate, phase_advance):
478
    # type: (Tensor, float, Tensor) -> Tensor
479
    r"""Given a STFT tensor, speed up in time without modifying pitch by a
480
    factor of ``rate``.
481
    Args:
482
        complex_specgrams (torch.Tensor): Dimension of `(channel, freq, time, complex=2)`
483
        rate (float): Speed-up factor
484
485
        phase_advance (torch.Tensor): Expected phase advance in each bin. Dimension
            of (freq, 1)
486
    Returns:
487
        complex_specgrams_stretch (torch.Tensor): Dimension of `(channel,
488
489
        freq, ceil(time/rate), complex=2)`
    Example
490
491
492
493
        >>> freq, hop_length = 1025, 512
        >>> # (channel, freq, time, complex=2)
        >>> complex_specgrams = torch.randn(2, freq, 300, 2)
        >>> rate = 1.3 # Speed up by 30%
494
        >>> phase_advance = torch.linspace(
495
        >>>    0, math.pi * hop_length, freq)[..., None]
496
497
        >>> x = phase_vocoder(complex_specgrams, rate, phase_advance)
        >>> x.shape # with 231 == ceil(300 / 1.3)
498
        torch.Size([2, 1025, 231, 2])
499
    """
500
501
502
503
504
505

    time_steps = torch.arange(0,
                              complex_specgrams.size(-2),
                              rate,
                              device=complex_specgrams.device,
                              dtype=complex_specgrams.dtype)
506

507
    alphas = time_steps % 1.0
508
    phase_0 = angle(complex_specgrams[:, :, :1])
509
510
511
512

    # Time Padding
    complex_specgrams = torch.nn.functional.pad(complex_specgrams, [0, 0, 0, 2])

513
514
515
    # (new_bins, freq, 2)
    complex_specgrams_0 = complex_specgrams[:, :, time_steps.long()]
    complex_specgrams_1 = complex_specgrams[:, :, (time_steps + 1).long()]
516
517
518
519

    angle_0 = angle(complex_specgrams_0)
    angle_1 = angle(complex_specgrams_1)

520
521
    norm_0 = torch.norm(complex_specgrams_0, p=2, dim=-1)
    norm_1 = torch.norm(complex_specgrams_1, p=2, dim=-1)
522
523
524
525
526
527

    phase = angle_1 - angle_0 - phase_advance
    phase = phase - 2 * math.pi * torch.round(phase / (2 * math.pi))

    # Compute Phase Accum
    phase = phase + phase_advance
528
    phase = torch.cat([phase_0, phase[:, :, :-1]], dim=-1)
529
530
531
532
533
534
535
536
537
538
    phase_acc = torch.cumsum(phase, -1)

    mag = alphas * norm_1 + (1 - alphas) * norm_0

    real_stretch = mag * torch.cos(phase_acc)
    imag_stretch = mag * torch.sin(phase_acc)

    complex_specgrams_stretch = torch.stack([real_stretch, imag_stretch], dim=-1)

    return complex_specgrams_stretch
539
540


541
@torch.jit.script
542
543
544
545
546
547
def lfilter(waveform, a_coeffs, b_coeffs):
    # type: (Tensor, Tensor, Tensor) -> Tensor
    r"""
    Performs an IIR filter by evaluating difference equation.

    Args:
Vincent QB's avatar
Vincent QB committed
548
        waveform (torch.Tensor): audio waveform of dimension of `(channel, time)`.  Must be normalized to -1 to 1.
549
550
551
552
553
554
555
556
        a_coeffs (torch.Tensor): denominator coefficients of difference equation of dimension of `(n_order + 1)`.
                                Lower delays coefficients are first, e.g. `[a0, a1, a2, ...]`.
                                Must be same size as b_coeffs (pad with 0's as necessary).
        b_coeffs (torch.Tensor): numerator coefficients of difference equation of dimension of `(n_order + 1)`.
                                 Lower delays coefficients are first, e.g. `[b0, b1, b2, ...]`.
                                 Must be same size as a_coeffs (pad with 0's as necessary).

    Returns:
Vincent QB's avatar
Vincent QB committed
557
        output_waveform (torch.Tensor): Dimension of `(channel, time)`.  Output will be clipped to -1 to 1.
558
559
560
561
562

    """

    assert(a_coeffs.size(0) == b_coeffs.size(0))
    assert(len(waveform.size()) == 2)
563
564
    assert(waveform.device == a_coeffs.device)
    assert(b_coeffs.device == a_coeffs.device)
565

566
567
    device = waveform.device
    dtype = waveform.dtype
Vincent QB's avatar
Vincent QB committed
568
    n_channel, n_sample = waveform.size()
569
570
571
572
    n_order = a_coeffs.size(0)
    assert(n_order > 0)

    # Pad the input and create output
Vincent QB's avatar
Vincent QB committed
573
    padded_waveform = torch.zeros(n_channel, n_sample + n_order - 1, dtype=dtype, device=device)
574
    padded_waveform[:, (n_order - 1):] = waveform
Vincent QB's avatar
Vincent QB committed
575
    padded_output_waveform = torch.zeros(n_channel, n_sample + n_order - 1, dtype=dtype, device=device)
576
577
578

    # Set up the coefficients matrix
    # Flip order, repeat, and transpose
Vincent QB's avatar
Vincent QB committed
579
580
    a_coeffs_filled = a_coeffs.flip(0).repeat(n_channel, 1).t()
    b_coeffs_filled = b_coeffs.flip(0).repeat(n_channel, 1).t()
581
582

    # Set up a few other utilities
Vincent QB's avatar
Vincent QB committed
583
584
    a0_repeated = torch.ones(n_channel, dtype=dtype, device=device) * a_coeffs[0]
    ones = torch.ones(n_channel, n_sample, dtype=dtype, device=device)
585

Vincent QB's avatar
Vincent QB committed
586
    for i_sample in range(n_sample):
587

Vincent QB's avatar
Vincent QB committed
588
        o0 = torch.zeros(n_channel, dtype=dtype, device=device)
589

Vincent QB's avatar
Vincent QB committed
590
591
        windowed_input_signal = padded_waveform[:, i_sample:(i_sample + n_order)]
        windowed_output_signal = padded_output_waveform[:, i_sample:(i_sample + n_order)]
592
593
594
595
596
597

        o0.add_(torch.diag(torch.mm(windowed_input_signal, b_coeffs_filled)))
        o0.sub_(torch.diag(torch.mm(windowed_output_signal, a_coeffs_filled)))

        o0.div_(a0_repeated)

Vincent QB's avatar
Vincent QB committed
598
        padded_output_waveform[:, i_sample + n_order - 1] = o0
599
600
601
602

    return torch.min(ones, torch.max(ones * -1, padded_output_waveform[:, (n_order - 1):]))


603
@torch.jit.script
604
605
606
607
608
609
def biquad(waveform, b0, b1, b2, a0, a1, a2):
    # type: (Tensor, float, float, float, float, float, float) -> Tensor
    r"""Performs a biquad filter of input tensor.  Initial conditions set to 0.
    https://en.wikipedia.org/wiki/Digital_biquad_filter

    Args:
Vincent QB's avatar
Vincent QB committed
610
        waveform (torch.Tensor): audio waveform of dimension of `(channel, time)`
611
612
613
614
615
616
617
618
        b0 (float): numerator coefficient of current input, x[n]
        b1 (float): numerator coefficient of input one time step ago x[n-1]
        b2 (float): numerator coefficient of input two time steps ago x[n-2]
        a0 (float): denominator coefficient of current output y[n], typically 1
        a1 (float): denominator coefficient of current output y[n-1]
        a2 (float): denominator coefficient of current output y[n-2]

    Returns:
Vincent QB's avatar
Vincent QB committed
619
        output_waveform (torch.Tensor): Dimension of `(channel, time)`
620
621
    """

622
623
    device = waveform.device
    dtype = waveform.dtype
624
625

    output_waveform = lfilter(
626
627
628
        waveform,
        torch.tensor([a0, a1, a2], dtype=dtype, device=device),
        torch.tensor([b0, b1, b2], dtype=dtype, device=device)
629
630
631
632
633
    )
    return output_waveform


def _dB2Linear(x):
634
    # type: (float) -> float
635
636
637
    return math.exp(x * math.log(10) / 20.0)


638
@torch.jit.script
639
def highpass_biquad(waveform, sample_rate, cutoff_freq, Q=0.707):
640
    # type: (Tensor, int, float, float) -> Tensor
641
642
643
    r"""Designs biquad highpass filter and performs filtering.  Similar to SoX implementation.

    Args:
Vincent QB's avatar
Vincent QB committed
644
        waveform (torch.Tensor): audio waveform of dimension of `(channel, time)`
645
646
647
648
649
        sample_rate (int): sampling rate of the waveform, e.g. 44100 (Hz)
        cutoff_freq (float): filter cutoff frequency
        Q (float): https://en.wikipedia.org/wiki/Q_factor

    Returns:
Vincent QB's avatar
Vincent QB committed
650
        output_waveform (torch.Tensor): Dimension of `(channel, time)`
651
652
    """

653
    GAIN = 1.
654
655
    w0 = 2 * math.pi * cutoff_freq / sample_rate
    A = math.exp(GAIN / 40.0 * math.log(10))
656
    alpha = math.sin(w0) / 2. / Q
657
658
659
660
661
662
663
664
665
666
667
    mult = _dB2Linear(max(GAIN, 0))

    b0 = (1 + math.cos(w0)) / 2
    b1 = -1 - math.cos(w0)
    b2 = b0
    a0 = 1 + alpha
    a1 = -2 * math.cos(w0)
    a2 = 1 - alpha
    return biquad(waveform, b0, b1, b2, a0, a1, a2)


668
@torch.jit.script
669
def lowpass_biquad(waveform, sample_rate, cutoff_freq, Q=0.707):
670
    # type: (Tensor, int, float, float) -> Tensor
671
672
673
    r"""Designs biquad lowpass filter and performs filtering.  Similar to SoX implementation.

    Args:
Vincent QB's avatar
Vincent QB committed
674
        waveform (torch.Tensor): audio waveform of dimension of `(channel, time)`
675
676
677
678
679
        sample_rate (int): sampling rate of the waveform, e.g. 44100 (Hz)
        cutoff_freq (float): filter cutoff frequency
        Q (float): https://en.wikipedia.org/wiki/Q_factor

    Returns:
Vincent QB's avatar
Vincent QB committed
680
        output_waveform (torch.Tensor): Dimension of `(channel, time)`
681
682
    """

683
    GAIN = 1.
684
685
686
687
688
689
690
691
692
693
694
695
    w0 = 2 * math.pi * cutoff_freq / sample_rate
    A = math.exp(GAIN / 40.0 * math.log(10))
    alpha = math.sin(w0) / 2 / Q
    mult = _dB2Linear(max(GAIN, 0))

    b0 = (1 - math.cos(w0)) / 2
    b1 = 1 - math.cos(w0)
    b2 = b0
    a0 = 1 + alpha
    a1 = -2 * math.cos(w0)
    a2 = 1 - alpha
    return biquad(waveform, b0, b1, b2, a0, a1, a2)
Vincent QB's avatar
Vincent QB committed
696
697


698
# @torch.jit.script
xinyang0's avatar
xinyang0 committed
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
def equalizer_biquad(waveform, sample_rate, center_freq, gain, Q=0.707):
    # type: (Tensor, int, float, float, float) -> Tensor
    r"""Designs biquad peaking equalizer filter and performs filtering.  Similar to SoX implementation.

    Args:
        waveform (torch.Tensor): audio waveform of dimension of `(channel, time)`
        sample_rate (int): sampling rate of the waveform, e.g. 44100 (Hz)
        center_freq (float): filter’s central frequency
        gain (float): desired gain at the boost (or attenuation) in dB
        q_factor (float): https://en.wikipedia.org/wiki/Q_factor

    Returns:
        output_waveform (torch.Tensor): Dimension of `(channel, time)`
    """
    w0 = 2 * math.pi * center_freq / sample_rate
    A = math.exp(gain / 40.0 * math.log(10))
    alpha = math.sin(w0) / 2 / Q

    b0 = 1 + alpha * A
    b1 = -2 * math.cos(w0)
    b2 = 1 - alpha * A
    a0 = 1 + alpha / A
    a1 = -2 * math.cos(w0)
    a2 = 1 - alpha / A
    return biquad(waveform, b0, b1, b2, a0, a1, a2)


726
727
728
729
730
731
732
733
734
@torch.jit.script
def mask_along_axis_iid(specgrams, mask_param, mask_value, axis):
    # type: (Tensor, int, float, int) -> Tensor
    r"""
    Apply a mask along ``axis``. Mask will be applied from indices ``[v_0, v_0 + v)``, where
    ``v`` is sampled from ``uniform(0, mask_param)``, and ``v_0`` from ``uniform(0, max_v - v)``.
    All examples will have the same mask interval.

    Args:
Vincent QB's avatar
Vincent QB committed
735
        specgrams (Tensor): Real spectrograms (batch, channel, freq, time)
736
737
738
739
740
        mask_param (int): Number of columns to be masked will be uniformly sampled from [0, mask_param]
        mask_value (float): Value to assign to the masked columns
        axis (int): Axis to apply masking on (2 -> frequency, 3 -> time)

    Returns:
Vincent QB's avatar
Vincent QB committed
741
        torch.Tensor: Masked spectrograms of dimensions (batch, channel, freq, time)
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
    """

    if axis != 2 and axis != 3:
        raise ValueError('Only Frequency and Time masking are supported')

    value = torch.rand(specgrams.shape[:2]) * mask_param
    min_value = torch.rand(specgrams.shape[:2]) * (specgrams.size(axis) - value)

    # Create broadcastable mask
    mask_start = (min_value.long())[..., None, None].float()
    mask_end = (min_value.long() + value.long())[..., None, None].float()
    mask = torch.arange(0, specgrams.size(axis)).float()

    # Per batch example masking
    specgrams = specgrams.transpose(axis, -1)
    specgrams.masked_fill_((mask >= mask_start) & (mask < mask_end), mask_value)
    specgrams = specgrams.transpose(axis, -1)

    return specgrams


@torch.jit.script
def mask_along_axis(specgram, mask_param, mask_value, axis):
    # type: (Tensor, int, float, int) -> Tensor
    r"""
    Apply a mask along ``axis``. Mask will be applied from indices ``[v_0, v_0 + v)``, where
    ``v`` is sampled from ``uniform(0, mask_param)``, and ``v_0`` from ``uniform(0, max_v - v)``.
    All examples will have the same mask interval.

    Args:
Vincent QB's avatar
Vincent QB committed
772
        specgram (Tensor): Real spectrogram (channel, freq, time)
773
774
775
776
777
        mask_param (int): Number of columns to be masked will be uniformly sampled from [0, mask_param]
        mask_value (float): Value to assign to the masked columns
        axis (int): Axis to apply masking on (1 -> frequency, 2 -> time)

    Returns:
Vincent QB's avatar
Vincent QB committed
778
        torch.Tensor: Masked spectrogram of dimensions (channel, freq, time)
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
    """

    value = torch.rand(1) * mask_param
    min_value = torch.rand(1) * (specgram.size(axis) - value)

    mask_start = (min_value.long()).squeeze()
    mask_end = (min_value.long() + value.long()).squeeze()

    assert mask_end - mask_start < mask_param
    if axis == 1:
        specgram[:, mask_start:mask_end] = mask_value
    elif axis == 2:
        specgram[:, :, mask_start:mask_end] = mask_value
    else:
        raise ValueError('Only Frequency and Time masking are supported')

    return specgram


@torch.jit.script
Vincent QB's avatar
Vincent QB committed
799
800
801
802
803
804
805
806
807
808
809
810
def compute_deltas(specgram, win_length=5, mode="replicate"):
    # type: (Tensor, int, str) -> Tensor
    r"""Compute delta coefficients of a tensor, usually a spectrogram:

    .. math::
        d_t = \frac{\sum_{n=1}^{\text{N}} n (c_{t+n} - c_{t-n})}{2 \sum_{n=1}^{\text{N} n^2}

    where :math:`d_t` is the deltas at time :math:`t`,
    :math:`c_t` is the spectrogram coeffcients at time :math:`t`,
    :math:`N` is (`win_length`-1)//2.

    Args:
Vincent QB's avatar
Vincent QB committed
811
        specgram (torch.Tensor): Tensor of audio of dimension (channel, freq, time)
Vincent QB's avatar
Vincent QB committed
812
813
814
815
        win_length (int): The window length used for computing delta
        mode (str): Mode parameter passed to padding

    Returns:
Vincent QB's avatar
Vincent QB committed
816
        deltas (torch.Tensor): Tensor of audio of dimension (channel, freq, time)
Vincent QB's avatar
Vincent QB committed
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843

    Example
        >>> specgram = torch.randn(1, 40, 1000)
        >>> delta = compute_deltas(specgram)
        >>> delta2 = compute_deltas(delta)
    """

    assert win_length >= 3
    assert specgram.dim() == 3
    assert not specgram.shape[1] % specgram.shape[0]

    n = (win_length - 1) // 2

    # twice sum of integer squared
    denom = n * (n + 1) * (2 * n + 1) / 3

    specgram = torch.nn.functional.pad(specgram, (n, n), mode=mode)

    kernel = (
        torch
        .arange(-n, n + 1, 1, device=specgram.device, dtype=specgram.dtype)
        .repeat(specgram.shape[1], specgram.shape[0], 1)
    )

    return torch.nn.functional.conv1d(
        specgram, kernel, groups=specgram.shape[1] // specgram.shape[0]
    ) / denom
Vincent QB's avatar
Vincent QB committed
844
845


846
@torch.jit.script
Vincent QB's avatar
Vincent QB committed
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
def _compute_nccf(waveform, sample_rate, frame_time, freq_low):
    # type: (Tensor, int, float, int) -> Tensor
    r"""
    Compute Normalized Cross-Correlation Function (NCCF).

    .. math::
        \phi_i(m) = \frac{\sum_{n=b_i}^{b_i + N-1} w(n) w(m+n)}{\sqrt{E(b_i) E(m+b_i)}},

    where
    :math:`\phi_i(m)` is the NCCF at frame :math:`i` with lag :math:`m`,
    :math:`w` is the waveform,
    :math:`N` is the lenght of a frame,
    :math:`b_i` is the beginning of frame :math:`i`,
    :math:`E(j)` is the energy :math:`\sum_{n=j}^{j+N-1} w^2(n)`.
    """

    EPSILON = 10 ** (-9)

    # Number of lags to check
    lags = math.ceil(sample_rate / freq_low)

    frame_size = int(math.ceil(sample_rate * frame_time))

    waveform_length = waveform.size()[-1]
    num_of_frames = math.ceil(waveform_length / frame_size)

    p = lags + num_of_frames * frame_size - waveform_length
    waveform = torch.nn.functional.pad(waveform, (0, p))

    # Compute lags
    output_lag = []
    for lag in range(1, lags + 1):
        s1 = waveform[..., :-lag].unfold(-1, frame_size, frame_size)[
            ..., :num_of_frames, :
        ]
        s2 = waveform[..., lag:].unfold(-1, frame_size, frame_size)[
            ..., :num_of_frames, :
        ]

        output_frames = (
            (s1 * s2).sum(-1)
            / (EPSILON + torch.norm(s1, p=2, dim=-1)).pow(2)
            / (EPSILON + torch.norm(s2, p=2, dim=-1)).pow(2)
        )

        output_lag.append(output_frames.unsqueeze(-1))

    nccf = torch.cat(output_lag, -1)

    return nccf


def _combine_max(a, b, thresh=0.99):
    # type: (Tuple[Tensor, Tensor], Tuple[Tensor, Tensor], float) -> Tuple[Tensor, Tensor]
    """
    Take value from first if bigger than a multiplicative factor of the second, elementwise.
    """
    mask = (a[0] > thresh * b[0])
    values = mask * a[0] + ~mask * b[0]
    indices = mask * a[1] + ~mask * b[1]
    return values, indices


def _find_max_per_frame(nccf, sample_rate, freq_high):
    # type: (Tensor, int, int) -> Tensor
    r"""
    For each frame, take the highest value of NCCF,
    apply centered median smoothing, and convert to frequency.

    Note: If the max among all the lags is very close
    to the first half of lags, then the latter is taken.
    """

    lag_min = math.ceil(sample_rate / freq_high)

    # Find near enough max that is smallest

    best = torch.max(nccf[..., lag_min:], -1)

    half_size = nccf.shape[-1] // 2
    half = torch.max(nccf[..., lag_min:half_size], -1)

    best = _combine_max(half, best)
    indices = best[1]

    # Add back minimal lag
    indices += lag_min
    # Add 1 empirical calibration offset
    indices += 1

    return indices


def _median_smoothing(indices, win_length):
    # type: (Tensor, int) -> Tensor
    r"""
    Apply median smoothing to the 1D tensor over the given window.
    """

    # Centered windowed
    pad_length = (win_length - 1) // 2

    # "replicate" padding in any dimension
    indices = torch.nn.functional.pad(
        indices, (pad_length, 0), mode="constant", value=0.
    )

    indices[..., :pad_length] = torch.cat(pad_length * [indices[..., pad_length].unsqueeze(-1)], dim=-1)
    roll = indices.unfold(-1, win_length, 1)

    values, _ = torch.median(roll, -1)
    return values


@torch.jit.script
def detect_pitch_frequency(
    waveform,
    sample_rate,
    frame_time=10 ** (-2),
    win_length=30,
    freq_low=85,
    freq_high=3400,
):
    # type: (Tensor, int, float, int, int, int) -> Tensor
    r"""Detect pitch frequency.

    It is implemented using normalized cross-correlation function and median smoothing.

    Args:
        waveform (torch.Tensor): Tensor of audio of dimension (channel, freq, time)
        sample_rate (int): The sample rate of the waveform (Hz)
        win_length (int): The window length for median smoothing (in number of frames)
        freq_low (int): Lowest frequency that can be detected (Hz)
        freq_high (int): Highest frequency that can be detected (Hz)

    Returns:
        freq (torch.Tensor): Tensor of audio of dimension (channel, frame)
    """

    nccf = _compute_nccf(waveform, sample_rate, frame_time, freq_low)
    indices = _find_max_per_frame(nccf, sample_rate, freq_high)
    indices = _median_smoothing(indices, win_length)

    # Convert indices to frequency
    EPSILON = 10 ** (-9)
    freq = sample_rate / (EPSILON + indices.to(torch.float))

    return freq