1# Copyright 2024 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the 'License');
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an 'AS IS' BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import logging
16import os
17import subprocess
18import wave
19
20from floss.pandora.floss import audio_test_data
21from floss.pandora.floss import cras_utils
22from floss.pandora.floss import sox_utils
23from floss.pandora.floss import utils
24
25CRAS_BLUETOOTH_OUTPUT_NODE_TYPE = 'BLUETOOTH'
26
27AUDIO_TEST_DIR = '/tmp/audio'
28
29A2DP_TEST_DATA = {
30    'rate': 48000,
31    'channels': 2,
32    'frequencies': (440, 20000),
33    'file': os.path.join(AUDIO_TEST_DIR, 'binaural_sine_440hz_20000hz_rate48000_5secs.wav'),
34    'recorded_by_sink': os.path.join(AUDIO_TEST_DIR, 'a2dp_recorded_by_sink.wav'),
35    'chunk_in_secs': 5,
36    'bit_width': 16,
37    'format': 'S16_LE',
38    'duration': 5,
39}
40
41A2DP_PLAYBACK_DATA = {
42    'rate': 44100,
43    'channels': 2,
44    'file': os.path.join(AUDIO_TEST_DIR, 'audio_playback.wav'),
45    'sample_width': 2
46}
47
48SAMPLE_FORMATS = dict(S32_LE=dict(message='Signed 32-bit integer, little-endian', dtype_str='<i', size_bytes=4),
49                      S16_LE=dict(message='Signed 16-bit integer, little-endian', dtype_str='<i', size_bytes=2))
50
51
52@utils.dbus_safe(None)
53def get_selected_output_device_type():
54    """Gets the selected audio output node type.
55
56    Returns:
57        The node type of the selected output device.
58    """
59    return str(cras_utils.get_selected_output_device_type())
60
61
62@utils.dbus_safe(None)
63def select_output_node(node_type):
64    """Selects the audio output node.
65
66    Args:
67        node_type: The node type of the Bluetooth peer device.
68
69    Returns:
70        True if the operation succeeds.
71    """
72    return cras_utils.set_single_selected_output_node(node_type)
73
74
75def select_audio_output_node():
76    """Selects the audio output node through cras."""
77
78    def bluetooth_type_selected(node_type):
79        """Checks if the bluetooth node type is selected."""
80        selected = get_selected_output_device_type()
81        logging.debug('active output node type: %s, expected %s', selected, node_type)
82        return selected == node_type
83
84    node_type = CRAS_BLUETOOTH_OUTPUT_NODE_TYPE
85    if not select_output_node(node_type):
86        return False
87
88    desc = 'waiting for %s as active cras audio output node type' % node_type
89    try:
90        utils.poll_for_condition(condition=lambda: bluetooth_type_selected(node_type),
91                                 timeout=20,
92                                 sleep_interval=1,
93                                 desc=desc)
94    except TimeoutError:
95        return False
96    return True
97
98
99def generate_audio_test_data(path, data_format=None, frequencies=None, duration_secs=None, volume_scale=None):
100    """Generates audio test data with specified format and frequencies.
101
102    Args:
103        path: The path to the file.
104        data_format: A dict containing data format including
105                     file_type, sample_format, channel, and rate.
106                     file_type: file type e.g. 'raw' or 'wav'.
107                     sample_format: One of the keys in audio_data.SAMPLE_FORMAT.
108                     channel: number of channels.
109                     ate: sampling rate.
110        frequencies: A list containing the frequency of each channel in this file.
111                     Only applicable to data of sine tone.
112        duration_secs: Duration of test file in seconds.
113        volume_scale: A float for volume scale used in sox command.
114                      E.g. 0.5 to scale volume by half. -1.0 to invert.
115
116    Returns:
117        An AudioTestData object.
118    """
119    sox_file_path = path
120
121    if data_format is None:
122        data_format = dict(file_type='wav', sample_format='S16_LE', channel=2, rate=48000)
123
124    sample_format = SAMPLE_FORMATS[data_format['sample_format']]
125    bits = sample_format['size_bytes'] * 8
126
127    command = sox_utils.generate_sine_tone_cmd(filename=sox_file_path,
128                                               channels=data_format['channel'],
129                                               bits=bits,
130                                               rate=data_format['rate'],
131                                               duration=duration_secs,
132                                               frequencies=frequencies,
133                                               vol=volume_scale,
134                                               raw=(data_format['file_type'] == 'raw'))
135
136    logging.info(' '.join(command))
137    subprocess.check_call(command)
138
139    test_data = audio_test_data.AudioTestData(data_format=data_format,
140                                              path=sox_file_path,
141                                              frequencies=frequencies,
142                                              duration_secs=duration_secs)
143
144    return test_data
145
146
147def generate_playback_file(audio_data):
148    """Generates the playback file if it does not exist yet.
149
150    Some audio test files may be large. Generate them on the fly to save the storage of the source tree.
151
152    Args:
153        audio_data: The audio test data.
154    """
155    directory = os.path.dirname(audio_data['file'])
156    if not os.path.exists(directory):
157        os.makedirs(directory)
158
159    if not os.path.exists(audio_data['file']):
160        data_format = dict(file_type='wav',
161                           sample_format='S16_LE',
162                           channel=audio_data['channels'],
163                           rate=audio_data['rate'])
164        generate_audio_test_data(data_format=data_format,
165                                 path=audio_data['file'],
166                                 duration_secs=audio_data['duration'],
167                                 frequencies=audio_data['frequencies'])
168        logging.debug('Audio file generated: %s', audio_data['file'])
169
170
171def generate_playback_file_from_binary_data(audio_data):
172    """Generates wav audio file from binary audio data.
173
174    Args:
175        audio_data: The binary audio data.
176    """
177    directory = os.path.dirname(A2DP_PLAYBACK_DATA['file'])
178    if not os.path.exists(directory):
179        os.makedirs(directory)
180
181    with wave.open(A2DP_PLAYBACK_DATA['file'], 'wb') as wav_file:
182        wav_file.setnchannels(A2DP_PLAYBACK_DATA['channels'])
183        wav_file.setframerate(A2DP_PLAYBACK_DATA['rate'])
184        wav_file.setsampwidth(A2DP_PLAYBACK_DATA['sample_width'])
185        wav_file.writeframes(audio_data)
186
187    logging.debug('wav file generated from binary data: %s', A2DP_PLAYBACK_DATA['file'])
188