1# Copyright 2024 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the 'License'); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an 'AS IS' BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import logging 16import os 17import subprocess 18import wave 19 20from floss.pandora.floss import audio_test_data 21from floss.pandora.floss import cras_utils 22from floss.pandora.floss import sox_utils 23from floss.pandora.floss import utils 24 25CRAS_BLUETOOTH_OUTPUT_NODE_TYPE = 'BLUETOOTH' 26 27AUDIO_TEST_DIR = '/tmp/audio' 28 29A2DP_TEST_DATA = { 30 'rate': 48000, 31 'channels': 2, 32 'frequencies': (440, 20000), 33 'file': os.path.join(AUDIO_TEST_DIR, 'binaural_sine_440hz_20000hz_rate48000_5secs.wav'), 34 'recorded_by_sink': os.path.join(AUDIO_TEST_DIR, 'a2dp_recorded_by_sink.wav'), 35 'chunk_in_secs': 5, 36 'bit_width': 16, 37 'format': 'S16_LE', 38 'duration': 5, 39} 40 41A2DP_PLAYBACK_DATA = { 42 'rate': 44100, 43 'channels': 2, 44 'file': os.path.join(AUDIO_TEST_DIR, 'audio_playback.wav'), 45 'sample_width': 2 46} 47 48SAMPLE_FORMATS = dict(S32_LE=dict(message='Signed 32-bit integer, little-endian', dtype_str='<i', size_bytes=4), 49 S16_LE=dict(message='Signed 16-bit integer, little-endian', dtype_str='<i', size_bytes=2)) 50 51 52@utils.dbus_safe(None) 53def get_selected_output_device_type(): 54 """Gets the selected audio output node type. 55 56 Returns: 57 The node type of the selected output device. 58 """ 59 return str(cras_utils.get_selected_output_device_type()) 60 61 62@utils.dbus_safe(None) 63def select_output_node(node_type): 64 """Selects the audio output node. 65 66 Args: 67 node_type: The node type of the Bluetooth peer device. 68 69 Returns: 70 True if the operation succeeds. 71 """ 72 return cras_utils.set_single_selected_output_node(node_type) 73 74 75def select_audio_output_node(): 76 """Selects the audio output node through cras.""" 77 78 def bluetooth_type_selected(node_type): 79 """Checks if the bluetooth node type is selected.""" 80 selected = get_selected_output_device_type() 81 logging.debug('active output node type: %s, expected %s', selected, node_type) 82 return selected == node_type 83 84 node_type = CRAS_BLUETOOTH_OUTPUT_NODE_TYPE 85 if not select_output_node(node_type): 86 return False 87 88 desc = 'waiting for %s as active cras audio output node type' % node_type 89 try: 90 utils.poll_for_condition(condition=lambda: bluetooth_type_selected(node_type), 91 timeout=20, 92 sleep_interval=1, 93 desc=desc) 94 except TimeoutError: 95 return False 96 return True 97 98 99def generate_audio_test_data(path, data_format=None, frequencies=None, duration_secs=None, volume_scale=None): 100 """Generates audio test data with specified format and frequencies. 101 102 Args: 103 path: The path to the file. 104 data_format: A dict containing data format including 105 file_type, sample_format, channel, and rate. 106 file_type: file type e.g. 'raw' or 'wav'. 107 sample_format: One of the keys in audio_data.SAMPLE_FORMAT. 108 channel: number of channels. 109 ate: sampling rate. 110 frequencies: A list containing the frequency of each channel in this file. 111 Only applicable to data of sine tone. 112 duration_secs: Duration of test file in seconds. 113 volume_scale: A float for volume scale used in sox command. 114 E.g. 0.5 to scale volume by half. -1.0 to invert. 115 116 Returns: 117 An AudioTestData object. 118 """ 119 sox_file_path = path 120 121 if data_format is None: 122 data_format = dict(file_type='wav', sample_format='S16_LE', channel=2, rate=48000) 123 124 sample_format = SAMPLE_FORMATS[data_format['sample_format']] 125 bits = sample_format['size_bytes'] * 8 126 127 command = sox_utils.generate_sine_tone_cmd(filename=sox_file_path, 128 channels=data_format['channel'], 129 bits=bits, 130 rate=data_format['rate'], 131 duration=duration_secs, 132 frequencies=frequencies, 133 vol=volume_scale, 134 raw=(data_format['file_type'] == 'raw')) 135 136 logging.info(' '.join(command)) 137 subprocess.check_call(command) 138 139 test_data = audio_test_data.AudioTestData(data_format=data_format, 140 path=sox_file_path, 141 frequencies=frequencies, 142 duration_secs=duration_secs) 143 144 return test_data 145 146 147def generate_playback_file(audio_data): 148 """Generates the playback file if it does not exist yet. 149 150 Some audio test files may be large. Generate them on the fly to save the storage of the source tree. 151 152 Args: 153 audio_data: The audio test data. 154 """ 155 directory = os.path.dirname(audio_data['file']) 156 if not os.path.exists(directory): 157 os.makedirs(directory) 158 159 if not os.path.exists(audio_data['file']): 160 data_format = dict(file_type='wav', 161 sample_format='S16_LE', 162 channel=audio_data['channels'], 163 rate=audio_data['rate']) 164 generate_audio_test_data(data_format=data_format, 165 path=audio_data['file'], 166 duration_secs=audio_data['duration'], 167 frequencies=audio_data['frequencies']) 168 logging.debug('Audio file generated: %s', audio_data['file']) 169 170 171def generate_playback_file_from_binary_data(audio_data): 172 """Generates wav audio file from binary audio data. 173 174 Args: 175 audio_data: The binary audio data. 176 """ 177 directory = os.path.dirname(A2DP_PLAYBACK_DATA['file']) 178 if not os.path.exists(directory): 179 os.makedirs(directory) 180 181 with wave.open(A2DP_PLAYBACK_DATA['file'], 'wb') as wav_file: 182 wav_file.setnchannels(A2DP_PLAYBACK_DATA['channels']) 183 wav_file.setframerate(A2DP_PLAYBACK_DATA['rate']) 184 wav_file.setsampwidth(A2DP_PLAYBACK_DATA['sample_width']) 185 wav_file.writeframes(audio_data) 186 187 logging.debug('wav file generated from binary data: %s', A2DP_PLAYBACK_DATA['file']) 188