1#!/usr/bin/env python3
2#
3# Copyright (C) 2019 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""Stream music through connected device from phone test implementation."""
17import acts
18import os
19import pandas as pd
20import shutil
21import time
22
23import acts_contrib.test_utils.coex.audio_test_utils as atu
24import acts_contrib.test_utils.bt.bt_test_utils as btutils
25from acts import asserts
26from acts_contrib.test_utils.bt import bt_constants
27from acts_contrib.test_utils.bt import BtEnum
28from acts_contrib.test_utils.abstract_devices.bluetooth_handsfree_abstract_device import BluetoothHandsfreeAbstractDeviceFactory as bt_factory
29from acts_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
30from acts_contrib.test_utils.bt.ble_performance_test_utils import plot_graph
31from acts_contrib.test_utils.power.PowerBTBaseTest import ramp_attenuation
32from acts_contrib.test_utils.bt.loggers import bluetooth_metric_logger as log
33from acts.signals import TestPass, TestError
34
35PHONE_MUSIC_FILE_DIRECTORY = '/sdcard/Music'
36INIT_ATTEN = 0
37WAIT_TIME = 1
38
39
40class A2dpBaseTest(BluetoothBaseTest):
41    """Stream audio file over desired Bluetooth codec configurations.
42
43    Audio file should be a sine wave. Other audio files will not work for the
44    test analysis metrics.
45
46    Device under test is Android phone, connected to headset with a controller
47    that can generate a BluetoothHandsfreeAbstractDevice from test_utils.
48    abstract_devices.bluetooth_handsfree_abstract_device.
49    BuetoothHandsfreeAbstractDeviceFactory.
50    """
51    def setup_class(self):
52
53        super().setup_class()
54        self.bt_logger = log.BluetoothMetricLogger.for_test_case()
55        self.dut = self.android_devices[0]
56        req_params = ['audio_params', 'music_files', 'system_path_loss']
57        opt_params = ['bugreport']
58        #'audio_params' is a dict, contains the audio device type, audio streaming
59        #settings such as volumn, duration, audio recording parameters such as
60        #channel, sampling rate/width, and thdn parameters for audio processing
61        self.unpack_userparams(req_params)
62        self.unpack_userparams(opt_params, bugreport=None)
63        # Find music file and push it to the dut
64        music_src = self.music_files[0]
65        music_dest = PHONE_MUSIC_FILE_DIRECTORY
66        success = self.dut.push_system_file(music_src, music_dest)
67        if success:
68            self.music_file = os.path.join(PHONE_MUSIC_FILE_DIRECTORY,
69                                           os.path.basename(music_src))
70        # Initialize media_control class
71        self.media = btutils.MediaControlOverSl4a(self.dut, self.music_file)
72        # Set attenuator to minimum attenuation
73        if hasattr(self, 'attenuators'):
74            self.attenuator = self.attenuators[0]
75            self.attenuator.set_atten(INIT_ATTEN)
76        # Create the BTOE(Bluetooth-Other-End) device object
77        bt_devices = self.user_params.get('bt_devices', [])
78        if bt_devices:
79            attr, idx = bt_devices.split(':')
80            self.bt_device_controller = getattr(self, attr)[int(idx)]
81            self.bt_device = bt_factory().generate(self.bt_device_controller)
82        else:
83            self.log.error('No BT devices config is provided!')
84
85    def teardown_class(self):
86
87        super().teardown_class()
88        if hasattr(self, 'media'):
89            self.media.stop()
90        if hasattr(self, 'attenuator'):
91            self.attenuator.set_atten(INIT_ATTEN)
92        self.dut.droid.bluetoothFactoryReset()
93        self.bt_device.reset()
94        self.bt_device.power_off()
95        btutils.disable_bluetooth(self.dut.droid)
96
97    def setup_test(self):
98
99        super().setup_test()
100        # Initialize audio capture devices
101        self.audio_device = atu.get_audio_capture_device(
102            self.bt_device_controller, self.audio_params)
103        # Reset BT to factory defaults
104        self.dut.droid.bluetoothFactoryReset()
105        self.bt_device.reset()
106        self.bt_device.power_on()
107        btutils.enable_bluetooth(self.dut.droid, self.dut.ed)
108        btutils.connect_phone_to_headset(self.dut, self.bt_device, 60)
109        vol = self.dut.droid.getMaxMediaVolume() * self.audio_params['volume']
110        self.dut.droid.setMediaVolume(0)
111        time.sleep(1)
112        self.dut.droid.setMediaVolume(int(vol))
113
114    def teardown_test(self):
115
116        super().teardown_test()
117        self.dut.droid.bluetoothFactoryReset()
118        self.media.stop()
119        # Set Attenuator to the initial attenuation
120        if hasattr(self, 'attenuator'):
121            self.attenuator.set_atten(INIT_ATTEN)
122        self.bt_device.reset()
123        self.bt_device.power_off()
124        btutils.disable_bluetooth(self.dut.droid)
125
126    def on_pass(self, test_name, begin_time):
127
128        if hasattr(self, 'bugreport') and self.bugreport == 1:
129            self._take_bug_report(test_name, begin_time)
130
131    def play_and_record_audio(self, duration):
132        """Play and record audio for a set duration.
133
134        Args:
135            duration: duration in seconds for music playing
136        Returns:
137            audio_captured: captured audio file path
138        """
139
140        self.log.info('Play and record audio for {} second'.format(duration))
141        self.media.play()
142        proc = self.audio_device.start()
143        time.sleep(duration + WAIT_TIME)
144        proc.kill()
145        time.sleep(WAIT_TIME)
146        proc.kill()
147        audio_captured = self.audio_device.stop()
148        self.media.stop()
149        self.log.info('Audio play and record stopped')
150        asserts.assert_true(audio_captured, 'Audio not recorded')
151        return audio_captured
152
153    def _get_bt_link_metrics(self, tag=''):
154        """Get bt link metrics such as rssi and tx pwls.
155
156        Returns:
157            master_metrics_list: list of metrics of central device
158            slave_metrics_list: list of metric of peripheral device
159        """
160
161        self.raw_bt_metrics_path = os.path.join(self.log_path,
162                                                'BT_Raw_Metrics')
163        self.media.play()
164        # Get master rssi and power level
165        process_data_dict = btutils.get_bt_metric(
166            self.dut, tag=tag, log_path=self.raw_bt_metrics_path)
167        rssi_master = process_data_dict.get('rssi')
168        pwl_master = process_data_dict.get('pwlv')
169        rssi_c0_master = process_data_dict.get('rssi_c0')
170        rssi_c1_master = process_data_dict.get('rssi_c1')
171        txpw_c0_master = process_data_dict.get('txpw_c0')
172        txpw_c1_master = process_data_dict.get('txpw_c1')
173        bftx_master = process_data_dict.get('bftx')
174        divtx_master = process_data_dict.get('divtx')
175
176        if isinstance(self.bt_device_controller,
177                      acts.controllers.android_device.AndroidDevice):
178            rssi_slave = btutils.get_bt_rssi(self.bt_device_controller,
179                                             tag=tag,
180                                             log_path=self.raw_bt_metrics_path)
181        else:
182            rssi_slave = None
183        self.media.stop()
184
185        master_metrics_list = [
186            rssi_master, pwl_master, rssi_c0_master, rssi_c1_master,
187            txpw_c0_master, txpw_c1_master, bftx_master, divtx_master
188        ]
189        slave_metrics_list = [rssi_slave]
190
191        return master_metrics_list, slave_metrics_list
192
193    def run_thdn_analysis(self, audio_captured, tag):
194        """Calculate Total Harmonic Distortion plus Noise for latest recording.
195
196        Store result in self.metrics.
197
198        Args:
199            audio_captured: the captured audio file
200        Returns:
201            thdn: thdn value in a list
202        """
203        # Calculate Total Harmonic Distortion + Noise
204        audio_result = atu.AudioCaptureResult(audio_captured,
205                                              self.audio_params)
206        thdn = audio_result.THDN(**self.audio_params['thdn_params'])
207        file_name = tag + os.path.basename(audio_result.path)
208        file_new = os.path.join(os.path.dirname(audio_result.path), file_name)
209        shutil.copyfile(audio_result.path, file_new)
210        for ch_no, t in enumerate(thdn):
211            self.log.info('THD+N for channel %s: %.4f%%' % (ch_no, t * 100))
212        return thdn
213
214    def run_anomaly_detection(self, audio_captured):
215        """Detect anomalies in latest recording.
216
217        Store result in self.metrics.
218
219        Args:
220            audio_captured: the captured audio file
221        Returns:
222            anom: anom detected in the captured file
223        """
224        # Detect Anomalies
225        audio_result = atu.AudioCaptureResult(audio_captured)
226        anom = audio_result.detect_anomalies(
227            **self.audio_params['anomaly_params'])
228        num_anom = 0
229        for ch_no, anomalies in enumerate(anom):
230            if anomalies:
231                for anomaly in anomalies:
232                    num_anom += 1
233                    start, end = anomaly
234                    self.log.warning(
235                        'Anomaly on channel {} at {}:{}. Duration '
236                        '{} sec'.format(ch_no, start // 60, start % 60,
237                                        end - start))
238        else:
239            self.log.info('%i anomalies detected.' % num_anom)
240        return anom
241
242    def generate_proto(self, data_points, codec_type, sample_rate,
243                       bits_per_sample, channel_mode):
244        """Generate a results protobuf.
245
246        Args:
247            data_points: list of dicts representing info to go into
248              AudioTestDataPoint protobuffer message.
249            codec_type: The codec type config to store in the proto.
250            sample_rate: The sample rate config to store in the proto.
251            bits_per_sample: The bits per sample config to store in the proto.
252            channel_mode: The channel mode config to store in the proto.
253        Returns:
254             dict: Dictionary with key 'proto' mapping to serialized protobuf,
255               'proto_ascii' mapping to human readable protobuf info, and 'test'
256               mapping to the test class name that generated the results.
257        """
258
259        # Populate protobuf
260        test_case_proto = self.bt_logger.proto_module.BluetoothAudioTestResult(
261        )
262
263        for data_point in data_points:
264            audio_data_proto = test_case_proto.data_points.add()
265            log.recursive_assign(audio_data_proto, data_point)
266
267        codec_proto = test_case_proto.a2dp_codec_config
268        codec_proto.codec_type = bt_constants.codec_types[codec_type]
269        codec_proto.sample_rate = int(sample_rate)
270        codec_proto.bits_per_sample = int(bits_per_sample)
271        codec_proto.channel_mode = bt_constants.channel_modes[channel_mode]
272
273        self.bt_logger.add_config_data_to_proto(test_case_proto, self.dut,
274                                                self.bt_device)
275
276        self.bt_logger.add_proto_to_results(test_case_proto,
277                                            self.__class__.__name__)
278
279        proto_dict = self.bt_logger.get_proto_dict(self.__class__.__name__,
280                                                   test_case_proto)
281        del proto_dict["proto_ascii"]
282        return proto_dict
283
284    def set_test_atten(self, atten):
285        """Set the attenuation(s) for current test condition.
286
287        """
288        if hasattr(self, 'dual_chain') and self.dual_chain == 1:
289            ramp_attenuation(self.atten_c0,
290                             atten,
291                             attenuation_step_max=2,
292                             time_wait_in_between=1)
293            self.log.info('Set Chain 0 attenuation to %d dB', atten)
294            ramp_attenuation(self.atten_c1,
295                             atten + self.gain_mismatch,
296                             attenuation_step_max=2,
297                             time_wait_in_between=1)
298            self.log.info('Set Chain 1 attenuation to %d dB',
299                          atten + self.gain_mismatch)
300        else:
301            ramp_attenuation(self.attenuator, atten)
302            self.log.info('Set attenuation to %d dB', atten)
303
304    def run_a2dp_to_max_range(self, codec_config):
305        attenuation_range = range(self.attenuation_vector['start'],
306                                  self.attenuation_vector['stop'] + 1,
307                                  self.attenuation_vector['step'])
308
309        data_points = []
310        self.file_output = os.path.join(
311            self.log_path, '{}.csv'.format(self.current_test_name))
312
313        # Set Codec if needed
314        current_codec = self.dut.droid.bluetoothA2dpGetCurrentCodecConfig()
315        current_codec_type = BtEnum.BluetoothA2dpCodecType(
316            current_codec['codecType']).name
317        if current_codec_type != codec_config['codec_type']:
318            codec_set = btutils.set_bluetooth_codec(self.dut, **codec_config)
319            asserts.assert_true(codec_set, 'Codec configuration failed.')
320        else:
321            self.log.info('Current codec is {}, no need to change'.format(
322                current_codec_type))
323
324        #loop RSSI with the same codec setting
325        for atten in attenuation_range:
326            self.media.play()
327            self.set_test_atten(atten)
328
329            tag = 'codec_{}_attenuation_{}dB_'.format(
330                codec_config['codec_type'], atten)
331            recorded_file = self.play_and_record_audio(
332                self.audio_params['duration'])
333            thdns = self.run_thdn_analysis(recorded_file, tag)
334
335            # Collect Metrics for dashboard
336            [
337                rssi_master, pwl_master, rssi_c0_master, rssi_c1_master,
338                txpw_c0_master, txpw_c1_master, bftx_master, divtx_master
339            ], [rssi_slave] = self._get_bt_link_metrics(tag)
340
341            data_point = {
342                'attenuation_db':
343                int(self.attenuator.get_atten()),
344                'pathloss':
345                atten + self.system_path_loss,
346                'rssi_primary':
347                rssi_master.get(self.dut.serial, -127),
348                'tx_power_level_master':
349                pwl_master.get(self.dut.serial, -127),
350                'rssi_secondary':
351                rssi_slave.get(self.bt_device_controller.serial, -127),
352                'rssi_c0_dut':
353                rssi_c0_master.get(self.dut.serial, -127),
354                'rssi_c1_dut':
355                rssi_c1_master.get(self.dut.serial, -127),
356                'txpw_c0_dut':
357                txpw_c0_master.get(self.dut.serial, -127),
358                'txpw_c1_dut':
359                txpw_c1_master.get(self.dut.serial, -127),
360                'bftx_state':
361                bftx_master.get(self.dut.serial, -127),
362                'divtx_state':
363                divtx_master.get(self.dut.serial, -127),
364                'total_harmonic_distortion_plus_noise_percent':
365                thdns[0] * 100
366            }
367            self.log.info(data_point)
368            # bokeh data for generating BokehFigure
369            bokeh_data = {
370                'x_label': 'Pathloss (dBm)',
371                'primary_y_label': 'RSSI (dBm)',
372                'log_path': self.log_path,
373                'current_test_name': self.current_test_name
374            }
375            #plot_data for adding line to existing BokehFigure
376            plot_data = {
377                'line_one': {
378                    'x_label': 'Pathloss (dBm)',
379                    'primary_y_label': 'RSSI (dBm)',
380                    'x_column': 'pathloss',
381                    'y_column': 'rssi_primary',
382                    'legend': 'DUT RSSI (dBm)',
383                    'marker': 'circle_x',
384                    'y_axis': 'default'
385                },
386                'line_two': {
387                    'x_column': 'pathloss',
388                    'y_column': 'rssi_secondary',
389                    'legend': 'Remote device RSSI (dBm)',
390                    'marker': 'hex',
391                    'y_axis': 'default'
392                },
393                'line_three': {
394                    'x_column': 'pathloss',
395                    'y_column': 'tx_power_level_master',
396                    'legend': 'DUT TX Power (dBm)',
397                    'marker': 'hex',
398                    'y_axis': 'secondary'
399                }
400            }
401
402            # Check thdn for glitches, stop if max range reached
403            if thdns[0] == 0:
404                proto_dict = self.generate_proto(data_points, **codec_config)
405                A2dpRange_df = pd.DataFrame(data_points)
406                A2dpRange_df.to_csv(self.file_output, index=False)
407                plot_graph(A2dpRange_df,
408                           plot_data,
409                           bokeh_data,
410                           secondary_y_label='DUT TX Power')
411                raise TestError(
412                    'Music play/recording is not working properly or Connection has lost'
413                )
414
415            data_points.append(data_point)
416            A2dpRange_df = pd.DataFrame(data_points)
417
418            for thdn in thdns:
419                if thdn >= self.audio_params['thdn_threshold']:
420                    self.log.info(
421                        'Max range at attenuation {} dB'.format(atten))
422                    self.log.info('DUT rssi {} dBm, DUT tx power level {}, '
423                                  'Remote rssi {} dBm'.format(
424                                      rssi_master, pwl_master, rssi_slave))
425                    proto_dict = self.generate_proto(data_points,
426                                                     **codec_config)
427                    A2dpRange_df.to_csv(self.file_output, index=False)
428                    plot_graph(A2dpRange_df,
429                               plot_data,
430                               bokeh_data,
431                               secondary_y_label='DUT TX Power')
432                    return True
433                    raise TestPass('Max range reached and move to next codec',
434                                   extras=proto_dict)
435        # Save Data points to csv
436        A2dpRange_df.to_csv(self.file_output, index=False)
437        # Plot graph
438        plot_graph(A2dpRange_df,
439                   plot_data,
440                   bokeh_data,
441                   secondary_y_label='DUT TX Power')
442        proto_dict = self.generate_proto(data_points, **codec_config)
443        return True
444        raise TestPass('Could not reach max range, need extra attenuation.',
445                       extras=proto_dict)
446