1#!/usr/bin/env python3
2#
3#   Copyright 2017 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16"""This module provides utilities to do audio data analysis."""
17
18import logging
19import numpy
20import soundfile
21from scipy.signal import blackmanharris
22from scipy.signal import iirnotch
23from scipy.signal import lfilter
24
25# The default block size of pattern matching.
26ANOMALY_DETECTION_BLOCK_SIZE = 120
27
28# Only peaks with coefficient greater than 0.01 of the first peak should be
29# considered. Note that this correspond to -40dB in the spectrum.
30DEFAULT_MIN_PEAK_RATIO = 0.01
31
32# The minimum RMS value of meaningful audio data.
33MEANINGFUL_RMS_THRESHOLD = 0.001
34
35# The minimal signal norm value.
36_MINIMUM_SIGNAL_NORM = 0.001
37
38# The default pattern mathing threshold. By experiment, this threshold
39# can tolerate normal noise of 0.3 amplitude when sine wave signal
40# amplitude is 1.
41PATTERN_MATCHING_THRESHOLD = 0.85
42
43# The default number of samples within the analysis step size that the
44# difference between two anomaly time values can be to be grouped together.
45ANOMALY_GROUPING_TOLERANCE = 1.0
46
47# Window size for peak detection.
48PEAK_WINDOW_SIZE_HZ = 20
49
50
51class RMSTooSmallError(Exception):
52    """Error when signal RMS is too small."""
53
54
55class EmptyDataError(Exception):
56    """Error when signal is empty."""
57
58
59def normalize_signal(signal, saturate_value):
60    """Normalizes the signal with respect to the saturate value.
61
62    Args:
63        signal: A list for one-channel PCM data.
64        saturate_value: The maximum value that the PCM data might be.
65
66    Returns:
67        A numpy array containing normalized signal. The normalized signal has
68            value -1 and 1 when it saturates.
69
70    """
71    signal = numpy.array(signal)
72    return signal / float(saturate_value)
73
74
75def spectral_analysis(signal,
76                      rate,
77                      min_peak_ratio=DEFAULT_MIN_PEAK_RATIO,
78                      peak_window_size_hz=PEAK_WINDOW_SIZE_HZ):
79    """Gets the dominant frequencies by spectral analysis.
80
81    Args:
82        signal: A list of numbers for one-channel PCM data. This should be
83                   normalized to [-1, 1] so the function can check if signal RMS
84                   is too small to be meaningful.
85        rate: Sampling rate in samples per second. Example inputs: 44100,
86        48000
87        min_peak_ratio: The minimum peak_i/peak_0 ratio such that the
88                           peaks other than the greatest one should be
89                           considered.
90                           This is to ignore peaks that are too small compared
91                           to the first peak peak_0.
92        peak_window_size_hz: The window size in Hz to find the peaks.
93                                The minimum differences between found peaks will
94                                be half of this value.
95
96    Returns:
97        A list of tuples:
98              [(peak_frequency_0, peak_coefficient_0),
99               (peak_frequency_1, peak_coefficient_1),
100               (peak_frequency_2, peak_coefficient_2), ...]
101              where the tuples are sorted by coefficients. The last
102              peak_coefficient will be no less than peak_coefficient *
103              min_peak_ratio. If RMS is less than MEANINGFUL_RMS_THRESHOLD,
104              returns [(0, 0)].
105
106    """
107    # Checks the signal is meaningful.
108    if len(signal) == 0:
109        raise EmptyDataError('Signal data is empty')
110
111    signal_rms = numpy.linalg.norm(signal) / numpy.sqrt(len(signal))
112    logging.debug('signal RMS = %s', signal_rms)
113
114    # If RMS is too small, set dominant frequency and coefficient to 0.
115    if signal_rms < MEANINGFUL_RMS_THRESHOLD:
116        logging.warning(
117            'RMS %s is too small to be meaningful. Set frequency to 0.',
118            signal_rms)
119        return [(0, 0)]
120
121    logging.debug('Doing spectral analysis ...')
122
123    # First, pass signal through a window function to mitigate spectral leakage.
124    y_conv_w = signal * numpy.hanning(len(signal))
125
126    length = len(y_conv_w)
127
128    # x_f is the frequency in Hz, y_f is the transformed coefficient.
129    x_f = _rfft_freq(length, rate)
130    y_f = 2.0 / length * numpy.fft.rfft(y_conv_w)
131
132    # y_f is complex so consider its absolute value for magnitude.
133    abs_y_f = numpy.abs(y_f)
134    threshold = max(abs_y_f) * min_peak_ratio
135
136    # Suppresses all coefficients that are below threshold.
137    for i in range(len(abs_y_f)):
138        if abs_y_f[i] < threshold:
139            abs_y_f[i] = 0
140
141    # Gets the peak detection window size in indice.
142    # x_f[1] is the frequency difference per index.
143    peak_window_size = int(peak_window_size_hz / x_f[1])
144
145    # Detects peaks.
146    peaks = peak_detection(abs_y_f, peak_window_size)
147
148    # Transform back the peak location from index to frequency.
149    results = []
150    for index, value in peaks:
151        results.append((x_f[int(index)], value))
152    return results
153
154
155def _rfft_freq(length, rate):
156    """Gets the frequency at each index of real FFT.
157
158    Args:
159        length: The window length of FFT.
160        rate: Sampling rate in samples per second. Example inputs: 44100,
161        48000
162
163    Returns:
164        A numpy array containing frequency corresponding to numpy.fft.rfft
165            result at each index.
166
167    """
168    # The difference in Hz between each index.
169    val = rate / float(length)
170    # Only care half of frequencies for FFT on real signal.
171    result_length = length // 2 + 1
172    return numpy.linspace(0, (result_length - 1) * val, result_length)
173
174
175def peak_detection(array, window_size):
176    """Detects peaks in an array.
177
178    A point (i, array[i]) is a peak if array[i] is the maximum among
179    array[i - half_window_size] to array[i + half_window_size].
180    If array[i - half_window_size] to array[i + half_window_size] are all equal,
181    then there is no peak in this window.
182    Note that we only consider peak with value greater than 0.
183
184    Args:
185        array: The input array to detect peaks in. Array is a list of
186        absolute values of the magnitude of transformed coefficient.
187
188        window_size: The window to detect peaks.
189
190    Returns:
191        A list of tuples:
192              [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), ...]
193              where the tuples are sorted by peak values.
194
195    """
196    half_window_size = window_size / 2
197    length = len(array)
198
199    def mid_is_peak(array, mid, left, right):
200        """Checks if value at mid is the largest among left to right in array.
201
202        Args:
203            array: A list of numbers.
204            mid: The mid index.
205            left: The left index.
206            rigth: The right index.
207
208        Returns:
209            A tuple (is_peak, next_candidate)
210                  is_peak is True if array[index] is the maximum among numbers
211                  in array between index [left, right] inclusively.
212                  next_candidate is the index of next candidate for peak if
213                  is_peak is False. It is the index of maximum value in
214                  [mid + 1, right]. If is_peak is True, next_candidate is
215                  right + 1.
216
217        """
218        value_mid = array[int(mid)]
219        is_peak = True
220        next_peak_candidate_index = None
221
222        # Check the left half window.
223        for index in range(int(left), int(mid)):
224            if array[index] >= value_mid:
225                is_peak = False
226                break
227
228        # Mid is at the end of array.
229        if mid == right:
230            return is_peak, right + 1
231
232        # Check the right half window and also record next candidate.
233        # Favor the larger index for next_peak_candidate_index.
234        for index in range(int(right), int(mid), -1):
235            if (next_peak_candidate_index is None
236                    or array[index] > array[next_peak_candidate_index]):
237                next_peak_candidate_index = index
238
239        if array[next_peak_candidate_index] >= value_mid:
240            is_peak = False
241
242        if is_peak:
243            next_peak_candidate_index = right + 1
244
245        return is_peak, next_peak_candidate_index
246
247    results = []
248    mid = 0
249    next_candidate_idx = None
250    while mid < length:
251        left = max(0, mid - half_window_size)
252        right = min(length - 1, mid + half_window_size)
253
254        # Only consider value greater than 0.
255        if array[int(mid)] == 0:
256            mid = mid + 1
257            continue
258
259        is_peak, next_candidate_idx = mid_is_peak(array, mid, left, right)
260
261        if is_peak:
262            results.append((mid, array[int(mid)]))
263
264        # Use the next candidate found in [mid + 1, right], or right + 1.
265        mid = next_candidate_idx
266
267    # Sort the peaks by values.
268    return sorted(results, key=lambda x: x[1], reverse=True)
269
270
271def anomaly_detection(signal,
272                      rate,
273                      freq,
274                      block_size=ANOMALY_DETECTION_BLOCK_SIZE,
275                      threshold=PATTERN_MATCHING_THRESHOLD):
276    """Detects anomaly in a sine wave signal.
277
278    This method detects anomaly in a sine wave signal by matching
279    patterns of each block.
280    For each moving window of block in the test signal, checks if there
281    is any block in golden signal that is similar to this block of test signal.
282    If there is such a block in golden signal, then this block of test
283    signal is matched and there is no anomaly in this block of test signal.
284    If there is any block in test signal that is not matched, then this block
285    covers an anomaly.
286    The block of test signal starts from index 0, and proceeds in steps of
287    half block size. The overlapping of test signal blocks makes sure there must
288    be at least one block covering the transition from sine wave to anomaly.
289
290    Args:
291        signal: A 1-D array-like object for 1-channel PCM data.
292        rate: Sampling rate in samples per second. Example inputs: 44100,
293        48000
294        freq: The expected frequency of signal.
295        block_size: The block size in samples to detect anomaly.
296        threshold: The threshold of correlation index to be judge as matched.
297
298    Returns:
299        A list containing time markers in seconds that have an anomaly within
300            block_size samples.
301
302    """
303    if len(signal) == 0:
304        raise EmptyDataError('Signal data is empty')
305
306    golden_y = _generate_golden_pattern(rate, freq, block_size)
307
308    results = []
309
310    for start in range(0, len(signal), int(block_size / 2)):
311        end = start + block_size
312        test_signal = signal[start:end]
313        matched = _moving_pattern_matching(golden_y, test_signal, threshold)
314        if not matched:
315            results.append(start)
316
317    results = [float(x) / rate for x in results]
318
319    return results
320
321
322def get_anomaly_durations(signal,
323                          rate,
324                          freq,
325                          block_size=ANOMALY_DETECTION_BLOCK_SIZE,
326                          threshold=PATTERN_MATCHING_THRESHOLD,
327                          tolerance=ANOMALY_GROUPING_TOLERANCE):
328    """Detect anomalies in a sine wav and return their start and end times.
329
330    Run anomaly_detection function and parse resulting array of time values into
331    discrete anomalies defined by a start and end time tuple. Time values are
332    judged to be part of the same anomaly if they lie within a given tolerance
333    of half the block_size number of samples of each other.
334
335    Args:
336        signal: A 1-D array-like object for 1-channel PCM data.
337        rate (int): Sampling rate in samples per second.
338            Example inputs: 44100, 48000
339        freq (int): The expected frequency of signal.
340        block_size (int): The block size in samples to detect anomaly.
341        threshold (float): The threshold of correlation index to be judge as
342            matched.
343        tolerance (float): The number of samples greater than block_size / 2
344            that the sample distance between two anomaly time values can be and
345            still be grouped as the same anomaly.
346    Returns:
347        bounds (list): a list of (start, end) tuples where start and end are the
348            boundaries in seconds of the detected anomaly.
349    """
350    bounds = []
351    anoms = anomaly_detection(signal, rate, freq, block_size, threshold)
352    if len(anoms) == 0:
353        return bounds
354    end = anoms[0]
355    start = anoms[0]
356    for i in range(len(anoms) - 1):
357        end = anoms[i]
358        sample_diff = abs(anoms[i] - anoms[i + 1]) * rate
359        # We require a tolerance because sample_diff may be slightly off due to
360        # float rounding errors in Python.
361        if sample_diff > block_size / 2 + tolerance:
362            bounds.append((start, end))
363            start = anoms[i + 1]
364    bounds.append((start, end))
365    return bounds
366
367
368def _generate_golden_pattern(rate, freq, block_size):
369    """Generates a golden pattern of certain frequency.
370
371    The golden pattern must cover all the possibilities of waveforms in a
372    block. So, we need a golden pattern covering 1 period + 1 block size,
373    such that the test block can start anywhere in a period, and extends
374    a block size.
375
376    |period |1 bk|
377    |       |    |
378     . .     . .
379    .   .   .   .
380         . .     .
381
382    Args:
383        rate: Sampling rate in samples per second. Example inputs: 44100,
384        48000
385        freq: The frequency of golden pattern.
386        block_size: The block size in samples to detect anomaly.
387
388    Returns:
389        A 1-D array for golden pattern.
390
391    """
392    samples_in_a_period = int(rate / freq) + 1
393    samples_in_golden_pattern = samples_in_a_period + block_size
394    golden_x = numpy.linspace(0.0,
395                              (samples_in_golden_pattern - 1) * 1.0 / rate,
396                              samples_in_golden_pattern)
397    golden_y = numpy.sin(freq * 2.0 * numpy.pi * golden_x)
398    return golden_y
399
400
401def _moving_pattern_matching(golden_signal, test_signal, threshold):
402    """Checks if test_signal is similar to any block of golden_signal.
403
404    Compares test signal with each block of golden signal by correlation
405    index. If there is any block of golden signal that is similar to
406    test signal, then it is matched.
407
408    Args:
409        golden_signal: A 1-D array for golden signal.
410        test_signal: A 1-D array for test signal.
411        threshold: The threshold of correlation index to be judge as matched.
412
413    Returns:
414        True if there is a match. False otherwise.
415
416        ValueError: if test signal is longer than golden signal.
417
418    """
419    if len(golden_signal) < len(test_signal):
420        raise ValueError('Test signal is longer than golden signal')
421
422    block_length = len(test_signal)
423    number_of_movings = len(golden_signal) - block_length + 1
424    correlation_indices = []
425    for moving_index in range(number_of_movings):
426        # Cuts one block of golden signal from start index.
427        # The block length is the same as test signal.
428        start = moving_index
429        end = start + block_length
430        golden_signal_block = golden_signal[start:end]
431        try:
432            correlation_index = _get_correlation_index(golden_signal_block,
433                                                       test_signal)
434        except TestSignalNormTooSmallError:
435            logging.info(
436                'Caught one block of test signal that has no meaningful norm')
437            return False
438        correlation_indices.append(correlation_index)
439
440    # Checks if the maximum correlation index is high enough.
441    max_corr = max(correlation_indices)
442    if max_corr < threshold:
443        logging.debug('Got one unmatched block with max_corr: %s', max_corr)
444        return False
445    return True
446
447
448class GoldenSignalNormTooSmallError(Exception):
449    """Exception when golden signal norm is too small."""
450
451
452class TestSignalNormTooSmallError(Exception):
453    """Exception when test signal norm is too small."""
454
455
456def _get_correlation_index(golden_signal, test_signal):
457    """Computes correlation index of two signal of same length.
458
459    Args:
460        golden_signal: An 1-D array-like object.
461        test_signal: An 1-D array-like object.
462
463    Raises:
464        ValueError: if two signal have different lengths.
465        GoldenSignalNormTooSmallError: if golden signal norm is too small
466        TestSignalNormTooSmallError: if test signal norm is too small.
467
468    Returns:
469        The correlation index.
470    """
471    if len(golden_signal) != len(test_signal):
472        raise ValueError('Only accepts signal of same length: %s, %s' %
473                         (len(golden_signal), len(test_signal)))
474
475    norm_golden = numpy.linalg.norm(golden_signal)
476    norm_test = numpy.linalg.norm(test_signal)
477    if norm_golden <= _MINIMUM_SIGNAL_NORM:
478        raise GoldenSignalNormTooSmallError(
479            'No meaningful data as norm is too small.')
480    if norm_test <= _MINIMUM_SIGNAL_NORM:
481        raise TestSignalNormTooSmallError(
482            'No meaningful data as norm is too small.')
483
484    # The 'valid' cross correlation result of two signals of same length will
485    # contain only one number.
486    correlation = numpy.correlate(golden_signal, test_signal, 'valid')[0]
487    return correlation / (norm_golden * norm_test)
488
489
490def fundamental_freq(signal, rate):
491    """Return fundamental frequency of signal by finding max in freq domain.
492    """
493    dft = numpy.fft.rfft(signal)
494    fund_freq = rate * (numpy.argmax(numpy.abs(dft)) / len(signal))
495    return fund_freq
496
497
498def rms(array):
499    """Return the root mean square of array.
500    """
501    return numpy.sqrt(numpy.mean(numpy.absolute(array)**2))
502
503
504def THDN(signal, rate, q, freq):
505    """Measure the THD+N for a signal and return the results.
506    Subtract mean to center signal around 0, remove fundamental frequency from
507    dft using notch filter and transform back into signal to get noise. Compute
508    ratio of RMS of noise signal to RMS of entire signal.
509
510    Args:
511        signal: array of values representing an audio signal.
512        rate: sample rate in Hz of the signal.
513        q: quality factor for the notch filter.
514        freq: fundamental frequency of the signal. All other frequencies
515            are noise. If not specified, will be calculated using FFT.
516    Returns:
517        THDN: THD+N ratio calculated from the ratio of RMS of pure harmonics
518            and noise signal to RMS of original signal.
519    """
520    # Normalize and window signal.
521    signal -= numpy.mean(signal)
522    windowed = signal * blackmanharris(len(signal))
523    # Find fundamental frequency to remove if not specified.
524    freq = freq or fundamental_freq(windowed, rate)
525    # Create notch filter to isolate noise.
526    w0 = freq / (rate / 2.0)
527    b, a = iirnotch(w0, q)
528    noise = lfilter(b, a, windowed)
529    # Calculate THD+N.
530    THDN = rms(noise) / rms(windowed)
531    return THDN
532
533
534def max_THDN(signal, rate, step_size, window_size, q, freq):
535    """Analyze signal with moving window and find maximum THD+N value.
536    Args:
537        signal: array representing the signal
538        rate: sample rate of the signal.
539        step_size: how many samples to move the window by for each analysis.
540        window_size: how many samples to analyze each time.
541        q: quality factor for the notch filter.
542        freq: fundamental frequency of the signal. All other frequencies
543            are noise. If not specified, will be calculated using FFT.
544    Returns:
545        greatest_THDN: the greatest THD+N value found across all windows
546    """
547    greatest_THDN = 0
548    cur = 0
549    while cur + window_size < len(signal):
550        window = signal[cur:cur + window_size]
551        res = THDN(window, rate, q, freq)
552        cur += step_size
553        if res > greatest_THDN:
554            greatest_THDN = res
555    return greatest_THDN
556
557
558def get_file_THDN(filename, q, freq=None):
559    """Get THD+N values for each channel of an audio file.
560
561    Args:
562        filename (str): path to the audio file.
563          (supported file types: http://www.mega-nerd.com/libsndfile/#Features)
564        q (float): quality factor for the notch filter.
565        freq (int|float): fundamental frequency of the signal. All other
566            frequencies are noise. If None, will be calculated with FFT.
567    Returns:
568        channel_results (list): THD+N value for each channel's signal.
569            List index corresponds to channel index.
570    """
571    audio_file = soundfile.SoundFile(filename)
572    channel_results = []
573    if audio_file.channels == 1:
574        channel_results.append(
575            THDN(signal=audio_file.read(),
576                 rate=audio_file.samplerate,
577                 q=q,
578                 freq=freq))
579    else:
580        for ch_no, channel in enumerate(audio_file.read().transpose()):
581            channel_results.append(
582                THDN(signal=channel,
583                     rate=audio_file.samplerate,
584                     q=q,
585                     freq=freq))
586    return channel_results
587
588
589def get_file_max_THDN(filename, step_size, window_size, q, freq=None):
590    """Get max THD+N value across analysis windows for each channel of file.
591
592    Args:
593        filename (str): path to the audio file.
594          (supported file types: http://www.mega-nerd.com/libsndfile/#Features)
595        step_size: how many samples to move the window by for each analysis.
596        window_size: how many samples to analyze each time.
597        q (float): quality factor for the notch filter.
598        freq (int|float): fundamental frequency of the signal. All other
599            frequencies are noise. If None, will be calculated with FFT.
600    Returns:
601        channel_results (list): max THD+N value for each channel's signal.
602            List index corresponds to channel index.
603    """
604    audio_file = soundfile.SoundFile(filename)
605    channel_results = []
606    if audio_file.channels == 1:
607        channel_results.append(
608            max_THDN(signal=audio_file.read(),
609                     rate=audio_file.samplerate,
610                     step_size=step_size,
611                     window_size=window_size,
612                     q=q,
613                     freq=freq))
614    else:
615        for ch_no, channel in enumerate(audio_file.read().transpose()):
616            channel_results.append(
617                max_THDN(signal=channel,
618                         rate=audio_file.samplerate,
619                         step_size=step_size,
620                         window_size=window_size,
621                         q=q,
622                         freq=freq))
623    return channel_results
624
625
626def get_file_anomaly_durations(filename,
627                               freq=None,
628                               block_size=ANOMALY_DETECTION_BLOCK_SIZE,
629                               threshold=PATTERN_MATCHING_THRESHOLD,
630                               tolerance=ANOMALY_GROUPING_TOLERANCE):
631    """Get durations of anomalies for each channel of audio file.
632
633    Args:
634        filename (str): path to the audio file.
635          (supported file types: http://www.mega-nerd.com/libsndfile/#Features)
636        freq (int|float): fundamental frequency of the signal. All other
637            frequencies are noise. If None, will be calculated with FFT.
638        block_size (int): The block size in samples to detect anomaly.
639        threshold (float): The threshold of correlation index to be judge as
640            matched.
641        tolerance (float): The number of samples greater than block_size / 2
642            that the sample distance between two anomaly time values can be and
643            still be grouped as the same anomaly.
644    Returns:
645        channel_results (list): anomaly durations for each channel's signal.
646            List index corresponds to channel index.
647    """
648    audio_file = soundfile.SoundFile(filename)
649    signal = audio_file.read()
650    freq = freq or fundamental_freq(signal, audio_file.samplerate)
651    channel_results = []
652    if audio_file.channels == 1:
653        channel_results.append(
654            get_anomaly_durations(signal=signal,
655                                  rate=audio_file.samplerate,
656                                  freq=freq,
657                                  block_size=block_size,
658                                  threshold=threshold,
659                                  tolerance=tolerance))
660    else:
661        for ch_no, channel in enumerate(signal.transpose()):
662            channel_results.append(
663                get_anomaly_durations(signal=channel,
664                                      rate=audio_file.samplerate,
665                                      freq=freq,
666                                      block_size=block_size,
667                                      threshold=threshold,
668                                      tolerance=tolerance))
669    return channel_results
670