1#!/usr/bin/env python3 2# 3# Copyright 2017 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16"""This module provides utilities to do audio data analysis.""" 17 18import logging 19import numpy 20import soundfile 21from scipy.signal import blackmanharris 22from scipy.signal import iirnotch 23from scipy.signal import lfilter 24 25# The default block size of pattern matching. 26ANOMALY_DETECTION_BLOCK_SIZE = 120 27 28# Only peaks with coefficient greater than 0.01 of the first peak should be 29# considered. Note that this correspond to -40dB in the spectrum. 30DEFAULT_MIN_PEAK_RATIO = 0.01 31 32# The minimum RMS value of meaningful audio data. 33MEANINGFUL_RMS_THRESHOLD = 0.001 34 35# The minimal signal norm value. 36_MINIMUM_SIGNAL_NORM = 0.001 37 38# The default pattern mathing threshold. By experiment, this threshold 39# can tolerate normal noise of 0.3 amplitude when sine wave signal 40# amplitude is 1. 41PATTERN_MATCHING_THRESHOLD = 0.85 42 43# The default number of samples within the analysis step size that the 44# difference between two anomaly time values can be to be grouped together. 45ANOMALY_GROUPING_TOLERANCE = 1.0 46 47# Window size for peak detection. 48PEAK_WINDOW_SIZE_HZ = 20 49 50 51class RMSTooSmallError(Exception): 52 """Error when signal RMS is too small.""" 53 54 55class EmptyDataError(Exception): 56 """Error when signal is empty.""" 57 58 59def normalize_signal(signal, saturate_value): 60 """Normalizes the signal with respect to the saturate value. 61 62 Args: 63 signal: A list for one-channel PCM data. 64 saturate_value: The maximum value that the PCM data might be. 65 66 Returns: 67 A numpy array containing normalized signal. The normalized signal has 68 value -1 and 1 when it saturates. 69 70 """ 71 signal = numpy.array(signal) 72 return signal / float(saturate_value) 73 74 75def spectral_analysis(signal, 76 rate, 77 min_peak_ratio=DEFAULT_MIN_PEAK_RATIO, 78 peak_window_size_hz=PEAK_WINDOW_SIZE_HZ): 79 """Gets the dominant frequencies by spectral analysis. 80 81 Args: 82 signal: A list of numbers for one-channel PCM data. This should be 83 normalized to [-1, 1] so the function can check if signal RMS 84 is too small to be meaningful. 85 rate: Sampling rate in samples per second. Example inputs: 44100, 86 48000 87 min_peak_ratio: The minimum peak_i/peak_0 ratio such that the 88 peaks other than the greatest one should be 89 considered. 90 This is to ignore peaks that are too small compared 91 to the first peak peak_0. 92 peak_window_size_hz: The window size in Hz to find the peaks. 93 The minimum differences between found peaks will 94 be half of this value. 95 96 Returns: 97 A list of tuples: 98 [(peak_frequency_0, peak_coefficient_0), 99 (peak_frequency_1, peak_coefficient_1), 100 (peak_frequency_2, peak_coefficient_2), ...] 101 where the tuples are sorted by coefficients. The last 102 peak_coefficient will be no less than peak_coefficient * 103 min_peak_ratio. If RMS is less than MEANINGFUL_RMS_THRESHOLD, 104 returns [(0, 0)]. 105 106 """ 107 # Checks the signal is meaningful. 108 if len(signal) == 0: 109 raise EmptyDataError('Signal data is empty') 110 111 signal_rms = numpy.linalg.norm(signal) / numpy.sqrt(len(signal)) 112 logging.debug('signal RMS = %s', signal_rms) 113 114 # If RMS is too small, set dominant frequency and coefficient to 0. 115 if signal_rms < MEANINGFUL_RMS_THRESHOLD: 116 logging.warning( 117 'RMS %s is too small to be meaningful. Set frequency to 0.', 118 signal_rms) 119 return [(0, 0)] 120 121 logging.debug('Doing spectral analysis ...') 122 123 # First, pass signal through a window function to mitigate spectral leakage. 124 y_conv_w = signal * numpy.hanning(len(signal)) 125 126 length = len(y_conv_w) 127 128 # x_f is the frequency in Hz, y_f is the transformed coefficient. 129 x_f = _rfft_freq(length, rate) 130 y_f = 2.0 / length * numpy.fft.rfft(y_conv_w) 131 132 # y_f is complex so consider its absolute value for magnitude. 133 abs_y_f = numpy.abs(y_f) 134 threshold = max(abs_y_f) * min_peak_ratio 135 136 # Suppresses all coefficients that are below threshold. 137 for i in range(len(abs_y_f)): 138 if abs_y_f[i] < threshold: 139 abs_y_f[i] = 0 140 141 # Gets the peak detection window size in indice. 142 # x_f[1] is the frequency difference per index. 143 peak_window_size = int(peak_window_size_hz / x_f[1]) 144 145 # Detects peaks. 146 peaks = peak_detection(abs_y_f, peak_window_size) 147 148 # Transform back the peak location from index to frequency. 149 results = [] 150 for index, value in peaks: 151 results.append((x_f[int(index)], value)) 152 return results 153 154 155def _rfft_freq(length, rate): 156 """Gets the frequency at each index of real FFT. 157 158 Args: 159 length: The window length of FFT. 160 rate: Sampling rate in samples per second. Example inputs: 44100, 161 48000 162 163 Returns: 164 A numpy array containing frequency corresponding to numpy.fft.rfft 165 result at each index. 166 167 """ 168 # The difference in Hz between each index. 169 val = rate / float(length) 170 # Only care half of frequencies for FFT on real signal. 171 result_length = length // 2 + 1 172 return numpy.linspace(0, (result_length - 1) * val, result_length) 173 174 175def peak_detection(array, window_size): 176 """Detects peaks in an array. 177 178 A point (i, array[i]) is a peak if array[i] is the maximum among 179 array[i - half_window_size] to array[i + half_window_size]. 180 If array[i - half_window_size] to array[i + half_window_size] are all equal, 181 then there is no peak in this window. 182 Note that we only consider peak with value greater than 0. 183 184 Args: 185 array: The input array to detect peaks in. Array is a list of 186 absolute values of the magnitude of transformed coefficient. 187 188 window_size: The window to detect peaks. 189 190 Returns: 191 A list of tuples: 192 [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), ...] 193 where the tuples are sorted by peak values. 194 195 """ 196 half_window_size = window_size / 2 197 length = len(array) 198 199 def mid_is_peak(array, mid, left, right): 200 """Checks if value at mid is the largest among left to right in array. 201 202 Args: 203 array: A list of numbers. 204 mid: The mid index. 205 left: The left index. 206 rigth: The right index. 207 208 Returns: 209 A tuple (is_peak, next_candidate) 210 is_peak is True if array[index] is the maximum among numbers 211 in array between index [left, right] inclusively. 212 next_candidate is the index of next candidate for peak if 213 is_peak is False. It is the index of maximum value in 214 [mid + 1, right]. If is_peak is True, next_candidate is 215 right + 1. 216 217 """ 218 value_mid = array[int(mid)] 219 is_peak = True 220 next_peak_candidate_index = None 221 222 # Check the left half window. 223 for index in range(int(left), int(mid)): 224 if array[index] >= value_mid: 225 is_peak = False 226 break 227 228 # Mid is at the end of array. 229 if mid == right: 230 return is_peak, right + 1 231 232 # Check the right half window and also record next candidate. 233 # Favor the larger index for next_peak_candidate_index. 234 for index in range(int(right), int(mid), -1): 235 if (next_peak_candidate_index is None 236 or array[index] > array[next_peak_candidate_index]): 237 next_peak_candidate_index = index 238 239 if array[next_peak_candidate_index] >= value_mid: 240 is_peak = False 241 242 if is_peak: 243 next_peak_candidate_index = right + 1 244 245 return is_peak, next_peak_candidate_index 246 247 results = [] 248 mid = 0 249 next_candidate_idx = None 250 while mid < length: 251 left = max(0, mid - half_window_size) 252 right = min(length - 1, mid + half_window_size) 253 254 # Only consider value greater than 0. 255 if array[int(mid)] == 0: 256 mid = mid + 1 257 continue 258 259 is_peak, next_candidate_idx = mid_is_peak(array, mid, left, right) 260 261 if is_peak: 262 results.append((mid, array[int(mid)])) 263 264 # Use the next candidate found in [mid + 1, right], or right + 1. 265 mid = next_candidate_idx 266 267 # Sort the peaks by values. 268 return sorted(results, key=lambda x: x[1], reverse=True) 269 270 271def anomaly_detection(signal, 272 rate, 273 freq, 274 block_size=ANOMALY_DETECTION_BLOCK_SIZE, 275 threshold=PATTERN_MATCHING_THRESHOLD): 276 """Detects anomaly in a sine wave signal. 277 278 This method detects anomaly in a sine wave signal by matching 279 patterns of each block. 280 For each moving window of block in the test signal, checks if there 281 is any block in golden signal that is similar to this block of test signal. 282 If there is such a block in golden signal, then this block of test 283 signal is matched and there is no anomaly in this block of test signal. 284 If there is any block in test signal that is not matched, then this block 285 covers an anomaly. 286 The block of test signal starts from index 0, and proceeds in steps of 287 half block size. The overlapping of test signal blocks makes sure there must 288 be at least one block covering the transition from sine wave to anomaly. 289 290 Args: 291 signal: A 1-D array-like object for 1-channel PCM data. 292 rate: Sampling rate in samples per second. Example inputs: 44100, 293 48000 294 freq: The expected frequency of signal. 295 block_size: The block size in samples to detect anomaly. 296 threshold: The threshold of correlation index to be judge as matched. 297 298 Returns: 299 A list containing time markers in seconds that have an anomaly within 300 block_size samples. 301 302 """ 303 if len(signal) == 0: 304 raise EmptyDataError('Signal data is empty') 305 306 golden_y = _generate_golden_pattern(rate, freq, block_size) 307 308 results = [] 309 310 for start in range(0, len(signal), int(block_size / 2)): 311 end = start + block_size 312 test_signal = signal[start:end] 313 matched = _moving_pattern_matching(golden_y, test_signal, threshold) 314 if not matched: 315 results.append(start) 316 317 results = [float(x) / rate for x in results] 318 319 return results 320 321 322def get_anomaly_durations(signal, 323 rate, 324 freq, 325 block_size=ANOMALY_DETECTION_BLOCK_SIZE, 326 threshold=PATTERN_MATCHING_THRESHOLD, 327 tolerance=ANOMALY_GROUPING_TOLERANCE): 328 """Detect anomalies in a sine wav and return their start and end times. 329 330 Run anomaly_detection function and parse resulting array of time values into 331 discrete anomalies defined by a start and end time tuple. Time values are 332 judged to be part of the same anomaly if they lie within a given tolerance 333 of half the block_size number of samples of each other. 334 335 Args: 336 signal: A 1-D array-like object for 1-channel PCM data. 337 rate (int): Sampling rate in samples per second. 338 Example inputs: 44100, 48000 339 freq (int): The expected frequency of signal. 340 block_size (int): The block size in samples to detect anomaly. 341 threshold (float): The threshold of correlation index to be judge as 342 matched. 343 tolerance (float): The number of samples greater than block_size / 2 344 that the sample distance between two anomaly time values can be and 345 still be grouped as the same anomaly. 346 Returns: 347 bounds (list): a list of (start, end) tuples where start and end are the 348 boundaries in seconds of the detected anomaly. 349 """ 350 bounds = [] 351 anoms = anomaly_detection(signal, rate, freq, block_size, threshold) 352 if len(anoms) == 0: 353 return bounds 354 end = anoms[0] 355 start = anoms[0] 356 for i in range(len(anoms) - 1): 357 end = anoms[i] 358 sample_diff = abs(anoms[i] - anoms[i + 1]) * rate 359 # We require a tolerance because sample_diff may be slightly off due to 360 # float rounding errors in Python. 361 if sample_diff > block_size / 2 + tolerance: 362 bounds.append((start, end)) 363 start = anoms[i + 1] 364 bounds.append((start, end)) 365 return bounds 366 367 368def _generate_golden_pattern(rate, freq, block_size): 369 """Generates a golden pattern of certain frequency. 370 371 The golden pattern must cover all the possibilities of waveforms in a 372 block. So, we need a golden pattern covering 1 period + 1 block size, 373 such that the test block can start anywhere in a period, and extends 374 a block size. 375 376 |period |1 bk| 377 | | | 378 . . . . 379 . . . . 380 . . . 381 382 Args: 383 rate: Sampling rate in samples per second. Example inputs: 44100, 384 48000 385 freq: The frequency of golden pattern. 386 block_size: The block size in samples to detect anomaly. 387 388 Returns: 389 A 1-D array for golden pattern. 390 391 """ 392 samples_in_a_period = int(rate / freq) + 1 393 samples_in_golden_pattern = samples_in_a_period + block_size 394 golden_x = numpy.linspace(0.0, 395 (samples_in_golden_pattern - 1) * 1.0 / rate, 396 samples_in_golden_pattern) 397 golden_y = numpy.sin(freq * 2.0 * numpy.pi * golden_x) 398 return golden_y 399 400 401def _moving_pattern_matching(golden_signal, test_signal, threshold): 402 """Checks if test_signal is similar to any block of golden_signal. 403 404 Compares test signal with each block of golden signal by correlation 405 index. If there is any block of golden signal that is similar to 406 test signal, then it is matched. 407 408 Args: 409 golden_signal: A 1-D array for golden signal. 410 test_signal: A 1-D array for test signal. 411 threshold: The threshold of correlation index to be judge as matched. 412 413 Returns: 414 True if there is a match. False otherwise. 415 416 ValueError: if test signal is longer than golden signal. 417 418 """ 419 if len(golden_signal) < len(test_signal): 420 raise ValueError('Test signal is longer than golden signal') 421 422 block_length = len(test_signal) 423 number_of_movings = len(golden_signal) - block_length + 1 424 correlation_indices = [] 425 for moving_index in range(number_of_movings): 426 # Cuts one block of golden signal from start index. 427 # The block length is the same as test signal. 428 start = moving_index 429 end = start + block_length 430 golden_signal_block = golden_signal[start:end] 431 try: 432 correlation_index = _get_correlation_index(golden_signal_block, 433 test_signal) 434 except TestSignalNormTooSmallError: 435 logging.info( 436 'Caught one block of test signal that has no meaningful norm') 437 return False 438 correlation_indices.append(correlation_index) 439 440 # Checks if the maximum correlation index is high enough. 441 max_corr = max(correlation_indices) 442 if max_corr < threshold: 443 logging.debug('Got one unmatched block with max_corr: %s', max_corr) 444 return False 445 return True 446 447 448class GoldenSignalNormTooSmallError(Exception): 449 """Exception when golden signal norm is too small.""" 450 451 452class TestSignalNormTooSmallError(Exception): 453 """Exception when test signal norm is too small.""" 454 455 456def _get_correlation_index(golden_signal, test_signal): 457 """Computes correlation index of two signal of same length. 458 459 Args: 460 golden_signal: An 1-D array-like object. 461 test_signal: An 1-D array-like object. 462 463 Raises: 464 ValueError: if two signal have different lengths. 465 GoldenSignalNormTooSmallError: if golden signal norm is too small 466 TestSignalNormTooSmallError: if test signal norm is too small. 467 468 Returns: 469 The correlation index. 470 """ 471 if len(golden_signal) != len(test_signal): 472 raise ValueError('Only accepts signal of same length: %s, %s' % 473 (len(golden_signal), len(test_signal))) 474 475 norm_golden = numpy.linalg.norm(golden_signal) 476 norm_test = numpy.linalg.norm(test_signal) 477 if norm_golden <= _MINIMUM_SIGNAL_NORM: 478 raise GoldenSignalNormTooSmallError( 479 'No meaningful data as norm is too small.') 480 if norm_test <= _MINIMUM_SIGNAL_NORM: 481 raise TestSignalNormTooSmallError( 482 'No meaningful data as norm is too small.') 483 484 # The 'valid' cross correlation result of two signals of same length will 485 # contain only one number. 486 correlation = numpy.correlate(golden_signal, test_signal, 'valid')[0] 487 return correlation / (norm_golden * norm_test) 488 489 490def fundamental_freq(signal, rate): 491 """Return fundamental frequency of signal by finding max in freq domain. 492 """ 493 dft = numpy.fft.rfft(signal) 494 fund_freq = rate * (numpy.argmax(numpy.abs(dft)) / len(signal)) 495 return fund_freq 496 497 498def rms(array): 499 """Return the root mean square of array. 500 """ 501 return numpy.sqrt(numpy.mean(numpy.absolute(array)**2)) 502 503 504def THDN(signal, rate, q, freq): 505 """Measure the THD+N for a signal and return the results. 506 Subtract mean to center signal around 0, remove fundamental frequency from 507 dft using notch filter and transform back into signal to get noise. Compute 508 ratio of RMS of noise signal to RMS of entire signal. 509 510 Args: 511 signal: array of values representing an audio signal. 512 rate: sample rate in Hz of the signal. 513 q: quality factor for the notch filter. 514 freq: fundamental frequency of the signal. All other frequencies 515 are noise. If not specified, will be calculated using FFT. 516 Returns: 517 THDN: THD+N ratio calculated from the ratio of RMS of pure harmonics 518 and noise signal to RMS of original signal. 519 """ 520 # Normalize and window signal. 521 signal -= numpy.mean(signal) 522 windowed = signal * blackmanharris(len(signal)) 523 # Find fundamental frequency to remove if not specified. 524 freq = freq or fundamental_freq(windowed, rate) 525 # Create notch filter to isolate noise. 526 w0 = freq / (rate / 2.0) 527 b, a = iirnotch(w0, q) 528 noise = lfilter(b, a, windowed) 529 # Calculate THD+N. 530 THDN = rms(noise) / rms(windowed) 531 return THDN 532 533 534def max_THDN(signal, rate, step_size, window_size, q, freq): 535 """Analyze signal with moving window and find maximum THD+N value. 536 Args: 537 signal: array representing the signal 538 rate: sample rate of the signal. 539 step_size: how many samples to move the window by for each analysis. 540 window_size: how many samples to analyze each time. 541 q: quality factor for the notch filter. 542 freq: fundamental frequency of the signal. All other frequencies 543 are noise. If not specified, will be calculated using FFT. 544 Returns: 545 greatest_THDN: the greatest THD+N value found across all windows 546 """ 547 greatest_THDN = 0 548 cur = 0 549 while cur + window_size < len(signal): 550 window = signal[cur:cur + window_size] 551 res = THDN(window, rate, q, freq) 552 cur += step_size 553 if res > greatest_THDN: 554 greatest_THDN = res 555 return greatest_THDN 556 557 558def get_file_THDN(filename, q, freq=None): 559 """Get THD+N values for each channel of an audio file. 560 561 Args: 562 filename (str): path to the audio file. 563 (supported file types: http://www.mega-nerd.com/libsndfile/#Features) 564 q (float): quality factor for the notch filter. 565 freq (int|float): fundamental frequency of the signal. All other 566 frequencies are noise. If None, will be calculated with FFT. 567 Returns: 568 channel_results (list): THD+N value for each channel's signal. 569 List index corresponds to channel index. 570 """ 571 audio_file = soundfile.SoundFile(filename) 572 channel_results = [] 573 if audio_file.channels == 1: 574 channel_results.append( 575 THDN(signal=audio_file.read(), 576 rate=audio_file.samplerate, 577 q=q, 578 freq=freq)) 579 else: 580 for ch_no, channel in enumerate(audio_file.read().transpose()): 581 channel_results.append( 582 THDN(signal=channel, 583 rate=audio_file.samplerate, 584 q=q, 585 freq=freq)) 586 return channel_results 587 588 589def get_file_max_THDN(filename, step_size, window_size, q, freq=None): 590 """Get max THD+N value across analysis windows for each channel of file. 591 592 Args: 593 filename (str): path to the audio file. 594 (supported file types: http://www.mega-nerd.com/libsndfile/#Features) 595 step_size: how many samples to move the window by for each analysis. 596 window_size: how many samples to analyze each time. 597 q (float): quality factor for the notch filter. 598 freq (int|float): fundamental frequency of the signal. All other 599 frequencies are noise. If None, will be calculated with FFT. 600 Returns: 601 channel_results (list): max THD+N value for each channel's signal. 602 List index corresponds to channel index. 603 """ 604 audio_file = soundfile.SoundFile(filename) 605 channel_results = [] 606 if audio_file.channels == 1: 607 channel_results.append( 608 max_THDN(signal=audio_file.read(), 609 rate=audio_file.samplerate, 610 step_size=step_size, 611 window_size=window_size, 612 q=q, 613 freq=freq)) 614 else: 615 for ch_no, channel in enumerate(audio_file.read().transpose()): 616 channel_results.append( 617 max_THDN(signal=channel, 618 rate=audio_file.samplerate, 619 step_size=step_size, 620 window_size=window_size, 621 q=q, 622 freq=freq)) 623 return channel_results 624 625 626def get_file_anomaly_durations(filename, 627 freq=None, 628 block_size=ANOMALY_DETECTION_BLOCK_SIZE, 629 threshold=PATTERN_MATCHING_THRESHOLD, 630 tolerance=ANOMALY_GROUPING_TOLERANCE): 631 """Get durations of anomalies for each channel of audio file. 632 633 Args: 634 filename (str): path to the audio file. 635 (supported file types: http://www.mega-nerd.com/libsndfile/#Features) 636 freq (int|float): fundamental frequency of the signal. All other 637 frequencies are noise. If None, will be calculated with FFT. 638 block_size (int): The block size in samples to detect anomaly. 639 threshold (float): The threshold of correlation index to be judge as 640 matched. 641 tolerance (float): The number of samples greater than block_size / 2 642 that the sample distance between two anomaly time values can be and 643 still be grouped as the same anomaly. 644 Returns: 645 channel_results (list): anomaly durations for each channel's signal. 646 List index corresponds to channel index. 647 """ 648 audio_file = soundfile.SoundFile(filename) 649 signal = audio_file.read() 650 freq = freq or fundamental_freq(signal, audio_file.samplerate) 651 channel_results = [] 652 if audio_file.channels == 1: 653 channel_results.append( 654 get_anomaly_durations(signal=signal, 655 rate=audio_file.samplerate, 656 freq=freq, 657 block_size=block_size, 658 threshold=threshold, 659 tolerance=tolerance)) 660 else: 661 for ch_no, channel in enumerate(signal.transpose()): 662 channel_results.append( 663 get_anomaly_durations(signal=channel, 664 rate=audio_file.samplerate, 665 freq=freq, 666 block_size=block_size, 667 threshold=threshold, 668 tolerance=tolerance)) 669 return channel_results 670