1#!/usr/bin/env python3 2# 3# Copyright (C) 2019 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16"""Stream music through connected device from phone test implementation.""" 17import acts 18import os 19import pandas as pd 20import shutil 21import time 22 23import acts_contrib.test_utils.coex.audio_test_utils as atu 24import acts_contrib.test_utils.bt.bt_test_utils as btutils 25from acts import asserts 26from acts_contrib.test_utils.bt import bt_constants 27from acts_contrib.test_utils.bt import BtEnum 28from acts_contrib.test_utils.abstract_devices.bluetooth_handsfree_abstract_device import BluetoothHandsfreeAbstractDeviceFactory as bt_factory 29from acts_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest 30from acts_contrib.test_utils.bt.ble_performance_test_utils import plot_graph 31from acts_contrib.test_utils.power.PowerBTBaseTest import ramp_attenuation 32from acts_contrib.test_utils.bt.loggers import bluetooth_metric_logger as log 33from acts.signals import TestPass, TestError 34 35PHONE_MUSIC_FILE_DIRECTORY = '/sdcard/Music' 36INIT_ATTEN = 0 37WAIT_TIME = 1 38 39 40class A2dpBaseTest(BluetoothBaseTest): 41 """Stream audio file over desired Bluetooth codec configurations. 42 43 Audio file should be a sine wave. Other audio files will not work for the 44 test analysis metrics. 45 46 Device under test is Android phone, connected to headset with a controller 47 that can generate a BluetoothHandsfreeAbstractDevice from test_utils. 48 abstract_devices.bluetooth_handsfree_abstract_device. 49 BuetoothHandsfreeAbstractDeviceFactory. 50 """ 51 def setup_class(self): 52 53 super().setup_class() 54 self.bt_logger = log.BluetoothMetricLogger.for_test_case() 55 self.dut = self.android_devices[0] 56 req_params = ['audio_params', 'music_files', 'system_path_loss'] 57 opt_params = ['bugreport'] 58 #'audio_params' is a dict, contains the audio device type, audio streaming 59 #settings such as volumn, duration, audio recording parameters such as 60 #channel, sampling rate/width, and thdn parameters for audio processing 61 self.unpack_userparams(req_params) 62 self.unpack_userparams(opt_params, bugreport=None) 63 # Find music file and push it to the dut 64 music_src = self.music_files[0] 65 music_dest = PHONE_MUSIC_FILE_DIRECTORY 66 success = self.dut.push_system_file(music_src, music_dest) 67 if success: 68 self.music_file = os.path.join(PHONE_MUSIC_FILE_DIRECTORY, 69 os.path.basename(music_src)) 70 # Initialize media_control class 71 self.media = btutils.MediaControlOverSl4a(self.dut, self.music_file) 72 # Set attenuator to minimum attenuation 73 if hasattr(self, 'attenuators'): 74 self.attenuator = self.attenuators[0] 75 self.attenuator.set_atten(INIT_ATTEN) 76 # Create the BTOE(Bluetooth-Other-End) device object 77 bt_devices = self.user_params.get('bt_devices', []) 78 if bt_devices: 79 attr, idx = bt_devices.split(':') 80 self.bt_device_controller = getattr(self, attr)[int(idx)] 81 self.bt_device = bt_factory().generate(self.bt_device_controller) 82 else: 83 self.log.error('No BT devices config is provided!') 84 85 def teardown_class(self): 86 87 super().teardown_class() 88 if hasattr(self, 'media'): 89 self.media.stop() 90 if hasattr(self, 'attenuator'): 91 self.attenuator.set_atten(INIT_ATTEN) 92 self.dut.droid.bluetoothFactoryReset() 93 self.bt_device.reset() 94 self.bt_device.power_off() 95 btutils.disable_bluetooth(self.dut.droid) 96 97 def setup_test(self): 98 99 super().setup_test() 100 # Initialize audio capture devices 101 self.audio_device = atu.get_audio_capture_device( 102 self.bt_device_controller, self.audio_params) 103 # Reset BT to factory defaults 104 self.dut.droid.bluetoothFactoryReset() 105 self.bt_device.reset() 106 self.bt_device.power_on() 107 btutils.enable_bluetooth(self.dut.droid, self.dut.ed) 108 btutils.connect_phone_to_headset(self.dut, self.bt_device, 60) 109 vol = self.dut.droid.getMaxMediaVolume() * self.audio_params['volume'] 110 self.dut.droid.setMediaVolume(0) 111 time.sleep(1) 112 self.dut.droid.setMediaVolume(int(vol)) 113 114 def teardown_test(self): 115 116 super().teardown_test() 117 self.dut.droid.bluetoothFactoryReset() 118 self.media.stop() 119 # Set Attenuator to the initial attenuation 120 if hasattr(self, 'attenuator'): 121 self.attenuator.set_atten(INIT_ATTEN) 122 self.bt_device.reset() 123 self.bt_device.power_off() 124 btutils.disable_bluetooth(self.dut.droid) 125 126 def on_pass(self, test_name, begin_time): 127 128 if hasattr(self, 'bugreport') and self.bugreport == 1: 129 self._take_bug_report(test_name, begin_time) 130 131 def play_and_record_audio(self, duration): 132 """Play and record audio for a set duration. 133 134 Args: 135 duration: duration in seconds for music playing 136 Returns: 137 audio_captured: captured audio file path 138 """ 139 140 self.log.info('Play and record audio for {} second'.format(duration)) 141 self.media.play() 142 proc = self.audio_device.start() 143 time.sleep(duration + WAIT_TIME) 144 proc.kill() 145 time.sleep(WAIT_TIME) 146 proc.kill() 147 audio_captured = self.audio_device.stop() 148 self.media.stop() 149 self.log.info('Audio play and record stopped') 150 asserts.assert_true(audio_captured, 'Audio not recorded') 151 return audio_captured 152 153 def _get_bt_link_metrics(self, tag=''): 154 """Get bt link metrics such as rssi and tx pwls. 155 156 Returns: 157 master_metrics_list: list of metrics of central device 158 slave_metrics_list: list of metric of peripheral device 159 """ 160 161 self.raw_bt_metrics_path = os.path.join(self.log_path, 162 'BT_Raw_Metrics') 163 self.media.play() 164 # Get master rssi and power level 165 process_data_dict = btutils.get_bt_metric( 166 self.dut, tag=tag, log_path=self.raw_bt_metrics_path) 167 rssi_master = process_data_dict.get('rssi') 168 pwl_master = process_data_dict.get('pwlv') 169 rssi_c0_master = process_data_dict.get('rssi_c0') 170 rssi_c1_master = process_data_dict.get('rssi_c1') 171 txpw_c0_master = process_data_dict.get('txpw_c0') 172 txpw_c1_master = process_data_dict.get('txpw_c1') 173 bftx_master = process_data_dict.get('bftx') 174 divtx_master = process_data_dict.get('divtx') 175 176 if isinstance(self.bt_device_controller, 177 acts.controllers.android_device.AndroidDevice): 178 rssi_slave = btutils.get_bt_rssi(self.bt_device_controller, 179 tag=tag, 180 log_path=self.raw_bt_metrics_path) 181 else: 182 rssi_slave = None 183 self.media.stop() 184 185 master_metrics_list = [ 186 rssi_master, pwl_master, rssi_c0_master, rssi_c1_master, 187 txpw_c0_master, txpw_c1_master, bftx_master, divtx_master 188 ] 189 slave_metrics_list = [rssi_slave] 190 191 return master_metrics_list, slave_metrics_list 192 193 def run_thdn_analysis(self, audio_captured, tag): 194 """Calculate Total Harmonic Distortion plus Noise for latest recording. 195 196 Store result in self.metrics. 197 198 Args: 199 audio_captured: the captured audio file 200 Returns: 201 thdn: thdn value in a list 202 """ 203 # Calculate Total Harmonic Distortion + Noise 204 audio_result = atu.AudioCaptureResult(audio_captured, 205 self.audio_params) 206 thdn = audio_result.THDN(**self.audio_params['thdn_params']) 207 file_name = tag + os.path.basename(audio_result.path) 208 file_new = os.path.join(os.path.dirname(audio_result.path), file_name) 209 shutil.copyfile(audio_result.path, file_new) 210 for ch_no, t in enumerate(thdn): 211 self.log.info('THD+N for channel %s: %.4f%%' % (ch_no, t * 100)) 212 return thdn 213 214 def run_anomaly_detection(self, audio_captured): 215 """Detect anomalies in latest recording. 216 217 Store result in self.metrics. 218 219 Args: 220 audio_captured: the captured audio file 221 Returns: 222 anom: anom detected in the captured file 223 """ 224 # Detect Anomalies 225 audio_result = atu.AudioCaptureResult(audio_captured) 226 anom = audio_result.detect_anomalies( 227 **self.audio_params['anomaly_params']) 228 num_anom = 0 229 for ch_no, anomalies in enumerate(anom): 230 if anomalies: 231 for anomaly in anomalies: 232 num_anom += 1 233 start, end = anomaly 234 self.log.warning( 235 'Anomaly on channel {} at {}:{}. Duration ' 236 '{} sec'.format(ch_no, start // 60, start % 60, 237 end - start)) 238 else: 239 self.log.info('%i anomalies detected.' % num_anom) 240 return anom 241 242 def generate_proto(self, data_points, codec_type, sample_rate, 243 bits_per_sample, channel_mode): 244 """Generate a results protobuf. 245 246 Args: 247 data_points: list of dicts representing info to go into 248 AudioTestDataPoint protobuffer message. 249 codec_type: The codec type config to store in the proto. 250 sample_rate: The sample rate config to store in the proto. 251 bits_per_sample: The bits per sample config to store in the proto. 252 channel_mode: The channel mode config to store in the proto. 253 Returns: 254 dict: Dictionary with key 'proto' mapping to serialized protobuf, 255 'proto_ascii' mapping to human readable protobuf info, and 'test' 256 mapping to the test class name that generated the results. 257 """ 258 259 # Populate protobuf 260 test_case_proto = self.bt_logger.proto_module.BluetoothAudioTestResult( 261 ) 262 263 for data_point in data_points: 264 audio_data_proto = test_case_proto.data_points.add() 265 log.recursive_assign(audio_data_proto, data_point) 266 267 codec_proto = test_case_proto.a2dp_codec_config 268 codec_proto.codec_type = bt_constants.codec_types[codec_type] 269 codec_proto.sample_rate = int(sample_rate) 270 codec_proto.bits_per_sample = int(bits_per_sample) 271 codec_proto.channel_mode = bt_constants.channel_modes[channel_mode] 272 273 self.bt_logger.add_config_data_to_proto(test_case_proto, self.dut, 274 self.bt_device) 275 276 self.bt_logger.add_proto_to_results(test_case_proto, 277 self.__class__.__name__) 278 279 proto_dict = self.bt_logger.get_proto_dict(self.__class__.__name__, 280 test_case_proto) 281 del proto_dict["proto_ascii"] 282 return proto_dict 283 284 def set_test_atten(self, atten): 285 """Set the attenuation(s) for current test condition. 286 287 """ 288 if hasattr(self, 'dual_chain') and self.dual_chain == 1: 289 ramp_attenuation(self.atten_c0, 290 atten, 291 attenuation_step_max=2, 292 time_wait_in_between=1) 293 self.log.info('Set Chain 0 attenuation to %d dB', atten) 294 ramp_attenuation(self.atten_c1, 295 atten + self.gain_mismatch, 296 attenuation_step_max=2, 297 time_wait_in_between=1) 298 self.log.info('Set Chain 1 attenuation to %d dB', 299 atten + self.gain_mismatch) 300 else: 301 ramp_attenuation(self.attenuator, atten) 302 self.log.info('Set attenuation to %d dB', atten) 303 304 def run_a2dp_to_max_range(self, codec_config): 305 attenuation_range = range(self.attenuation_vector['start'], 306 self.attenuation_vector['stop'] + 1, 307 self.attenuation_vector['step']) 308 309 data_points = [] 310 self.file_output = os.path.join( 311 self.log_path, '{}.csv'.format(self.current_test_name)) 312 313 # Set Codec if needed 314 current_codec = self.dut.droid.bluetoothA2dpGetCurrentCodecConfig() 315 current_codec_type = BtEnum.BluetoothA2dpCodecType( 316 current_codec['codecType']).name 317 if current_codec_type != codec_config['codec_type']: 318 codec_set = btutils.set_bluetooth_codec(self.dut, **codec_config) 319 asserts.assert_true(codec_set, 'Codec configuration failed.') 320 else: 321 self.log.info('Current codec is {}, no need to change'.format( 322 current_codec_type)) 323 324 #loop RSSI with the same codec setting 325 for atten in attenuation_range: 326 self.media.play() 327 self.set_test_atten(atten) 328 329 tag = 'codec_{}_attenuation_{}dB_'.format( 330 codec_config['codec_type'], atten) 331 recorded_file = self.play_and_record_audio( 332 self.audio_params['duration']) 333 thdns = self.run_thdn_analysis(recorded_file, tag) 334 335 # Collect Metrics for dashboard 336 [ 337 rssi_master, pwl_master, rssi_c0_master, rssi_c1_master, 338 txpw_c0_master, txpw_c1_master, bftx_master, divtx_master 339 ], [rssi_slave] = self._get_bt_link_metrics(tag) 340 341 data_point = { 342 'attenuation_db': 343 int(self.attenuator.get_atten()), 344 'pathloss': 345 atten + self.system_path_loss, 346 'rssi_primary': 347 rssi_master.get(self.dut.serial, -127), 348 'tx_power_level_master': 349 pwl_master.get(self.dut.serial, -127), 350 'rssi_secondary': 351 rssi_slave.get(self.bt_device_controller.serial, -127), 352 'rssi_c0_dut': 353 rssi_c0_master.get(self.dut.serial, -127), 354 'rssi_c1_dut': 355 rssi_c1_master.get(self.dut.serial, -127), 356 'txpw_c0_dut': 357 txpw_c0_master.get(self.dut.serial, -127), 358 'txpw_c1_dut': 359 txpw_c1_master.get(self.dut.serial, -127), 360 'bftx_state': 361 bftx_master.get(self.dut.serial, -127), 362 'divtx_state': 363 divtx_master.get(self.dut.serial, -127), 364 'total_harmonic_distortion_plus_noise_percent': 365 thdns[0] * 100 366 } 367 self.log.info(data_point) 368 # bokeh data for generating BokehFigure 369 bokeh_data = { 370 'x_label': 'Pathloss (dBm)', 371 'primary_y_label': 'RSSI (dBm)', 372 'log_path': self.log_path, 373 'current_test_name': self.current_test_name 374 } 375 #plot_data for adding line to existing BokehFigure 376 plot_data = { 377 'line_one': { 378 'x_label': 'Pathloss (dBm)', 379 'primary_y_label': 'RSSI (dBm)', 380 'x_column': 'pathloss', 381 'y_column': 'rssi_primary', 382 'legend': 'DUT RSSI (dBm)', 383 'marker': 'circle_x', 384 'y_axis': 'default' 385 }, 386 'line_two': { 387 'x_column': 'pathloss', 388 'y_column': 'rssi_secondary', 389 'legend': 'Remote device RSSI (dBm)', 390 'marker': 'hex', 391 'y_axis': 'default' 392 }, 393 'line_three': { 394 'x_column': 'pathloss', 395 'y_column': 'tx_power_level_master', 396 'legend': 'DUT TX Power (dBm)', 397 'marker': 'hex', 398 'y_axis': 'secondary' 399 } 400 } 401 402 # Check thdn for glitches, stop if max range reached 403 if thdns[0] == 0: 404 proto_dict = self.generate_proto(data_points, **codec_config) 405 A2dpRange_df = pd.DataFrame(data_points) 406 A2dpRange_df.to_csv(self.file_output, index=False) 407 plot_graph(A2dpRange_df, 408 plot_data, 409 bokeh_data, 410 secondary_y_label='DUT TX Power') 411 raise TestError( 412 'Music play/recording is not working properly or Connection has lost' 413 ) 414 415 data_points.append(data_point) 416 A2dpRange_df = pd.DataFrame(data_points) 417 418 for thdn in thdns: 419 if thdn >= self.audio_params['thdn_threshold']: 420 self.log.info( 421 'Max range at attenuation {} dB'.format(atten)) 422 self.log.info('DUT rssi {} dBm, DUT tx power level {}, ' 423 'Remote rssi {} dBm'.format( 424 rssi_master, pwl_master, rssi_slave)) 425 proto_dict = self.generate_proto(data_points, 426 **codec_config) 427 A2dpRange_df.to_csv(self.file_output, index=False) 428 plot_graph(A2dpRange_df, 429 plot_data, 430 bokeh_data, 431 secondary_y_label='DUT TX Power') 432 return True 433 raise TestPass('Max range reached and move to next codec', 434 extras=proto_dict) 435 # Save Data points to csv 436 A2dpRange_df.to_csv(self.file_output, index=False) 437 # Plot graph 438 plot_graph(A2dpRange_df, 439 plot_data, 440 bokeh_data, 441 secondary_y_label='DUT TX Power') 442 proto_dict = self.generate_proto(data_points, **codec_config) 443 return True 444 raise TestPass('Could not reach max range, need extra attenuation.', 445 extras=proto_dict) 446