1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3""" 4Created on Thu Jul 16 22:58:03 2020 5 6@author: qijiang 7""" 8 9import os 10import pyvisa 11import time 12import acts_contrib.test_utils.coex.audio_test_utils as atu 13import acts_contrib.test_utils.bt.bt_test_utils as btutils 14import pandas as pd 15from acts import asserts 16from acts_contrib.test_utils.abstract_devices.bluetooth_handsfree_abstract_device import BluetoothHandsfreeAbstractDeviceFactory as bt_factory 17from acts_contrib.test_utils.bt.A2dpBaseTest import A2dpBaseTest 18from acts_contrib.test_utils.power.PowerBTBaseTest import ramp_attenuation 19 20PHONE_MUSIC_FILE_DIRECTORY = '/sdcard/Music' 21 22 23class RPIAxis(object): 24 def __init__(self, VisaConnectString): 25 """Constructor. 26 Create a Visa connection 27 28 """ 29 rm = pyvisa.ResourceManager() 30 self.instrument = rm.open_resource(VisaConnectString) 31 self.instrument.read_termination = "\n" # make sure we look for newline at the end of strings we read 32 33 def __getattr__(self, attr): 34 return getattr(self.instrument, attr) # Delegate all other attrs 35 36 37class RPIAxis_card(RPIAxis): 38 """ RPIAxis_card() 39 Create an axis 40 41 """ 42 def __init__(self, axis_object): 43 # create an object to communicate to an RPI2 (remote positioner instrument) axis 44 self.axis = axis_object # store pyvisa instrument connection 45 46 def __getattr__(self, attr): 47 return getattr(self.axis, attr) # Delegate all other attrs 48 49 def moveTo(self, where): 50 """ moveTo 51 move to a given position and make sure you arrived at the target. 52 """ 53 # max travale time in seconds. adjust this if you have a really slow positioner! 54 MAXTRAVELTIME = 150 55 t0 = time.time() 56 self.axis.write("SK %d\n" % where) 57 done = False 58 while (not done): 59 if (time.time() - t0) > MAXTRAVELTIME: 60 print("looks like we are stuck!\n") 61 return False 62 response = self.axis.query("*opc?\n") 63 if (response == '1'): 64 return True 65 else: 66 response = self.axis.query("CP?\n") 67 68 # stop the positioner 69 def Stop(self): 70 t0 = time.time() 71 done = False 72 self.axis.write("ST\n") 73 while (not done): 74 if (time.time() - t0) > 2: 75 print("Runaway positioner!\n") 76 return False 77 response = self.axis.query("*opc?\n") 78 if (response == '1'): 79 return True 80 81 # set continuous rotation mode 82 def SetContinuousRotationMode(self): 83 self.axis.write("CR\n") 84 85 # set non continuous rotation mode 86 def SetNonContinuousRotationMode(self): 87 self.axis.write("NCR\n") 88 89 90class BtA2dpOtaRangeTest(A2dpBaseTest): 91 def setup_class(self): 92 93 #'audio_params' is a dict, contains the audio device type, audio streaming 94 #settings such as volume, duration, audio recording parameters such as 95 #channel, sampling rate/width, and thdn parameters for audio processing 96 req_params = [ 97 'audio_params', 'positioner', 'dut_config', 'attenuation_vector' 98 ] 99 opt_params = ['music_files'] 100 self.unpack_userparams(req_params) 101 if len(self.android_devices) > 1: 102 self.dut = self.android_devices[1] 103 self.unpack_userparams(opt_params) 104 music_src = self.music_files[0] 105 music_dest = PHONE_MUSIC_FILE_DIRECTORY 106 success = self.dut.push_system_file(music_src, music_dest) 107 if success: 108 self.music_file = os.path.join(PHONE_MUSIC_FILE_DIRECTORY, 109 os.path.basename(music_src)) 110 # Initialize media_control class 111 self.media = btutils.MediaControlOverSl4a(self.dut, 112 self.music_file) 113 # Set attenuator to minimum attenuation 114 self.attenuator = self.attenuators[0] 115 self.attenuator.set_atten(self.attenuation_vector['min']) 116 # Create the BTOE(Bluetooth-Other-End) device object 117 bt_devices = self.user_params.get('bt_devices', []) 118 attr, idx = bt_devices.split(':') 119 self.bt_device_controller = getattr(self, attr)[int(idx)] 120 self.bt_device = bt_factory().generate(self.bt_device_controller) 121 btutils.enable_bqr(self.bt_device_controller) 122 123 #Setup positioner 124 self.PhiAxisAddress = "TCPIP0::{}::{}::SOCKET".format( 125 self.positioner["server_ip"], self.positioner["phi_axis_port"]) 126 self.ThetaAxisAddress = "TCPIP0::{}::{}::SOCKET".format( 127 self.positioner["server_ip"], self.positioner["theta_axis_port"]) 128 self.phi_axis = RPIAxis(self.PhiAxisAddress) 129 self.phi_card = RPIAxis_card(self.phi_axis) 130 self.log.info("*IDN? response: {}".format( 131 self.phi_card.query("*idn?\n"))) 132 self.theta_axis = RPIAxis(self.ThetaAxisAddress) 133 self.theta_card = RPIAxis_card(self.theta_axis) 134 self.log.info("*IDN? response: {}".format( 135 self.theta_card.query("*idn?\n"))) 136 self.phi_card.Stop() 137 self.theta_card.Stop() 138 139 def teardown_class(self): 140 141 if hasattr(self, 'media'): 142 self.media.stop() 143 if hasattr(self, 'attenuator'): 144 self.attenuator.set_atten(self.attenuation_vector['min']) 145 if hasattr(self, 'dut'): 146 self.dut.droid.bluetoothFactoryReset() 147 btutils.disable_bluetooth(self.dut.droid) 148 self.bt_device.reset() 149 self.bt_device.power_off() 150 self.phi_card.moveTo(0) 151 self.theta_card.moveTo(0) 152 153 def setup_test(self): 154 155 # Reset headset 156 self.bt_device.reset() 157 # Initialize audio capture devices 158 self.audio_device = atu.get_audio_capture_device( 159 self.bt_device_controller, self.audio_params) 160 # Connect BT link 161 connected = self.establish_bt_connection() 162 asserts.assert_true(connected, 'BT connection failed') 163 # Output file 164 file_name = 'OTA_Range_Over_Angle_{}_{}.csv'.format( 165 self.dut_config['model'], self.dut_config['screen_placement']) 166 self.file_output = os.path.join(self.log_path, file_name) 167 168 def teardown_test(self): 169 170 if hasattr(self, 'media'): 171 self.media.stop() 172 if hasattr(self, 'attenuator'): 173 self.attenuator.set_atten(self.attenuation_vector['min']) 174 if hasattr(self, 'dut'): 175 self.dut.droid.bluetoothFactoryReset() 176 btutils.disable_bluetooth(self.dut.droid) 177 self.bt_device.reset() 178 self.bt_device.power_off() 179 self.phi_card.moveTo(0) 180 self.theta_card.moveTo(0) 181 182 def a2dp_play(self): 183 184 if hasattr(self, 'dut'): 185 vol = self.dut.droid.getMaxMediaVolume( 186 ) * self.audio_params['volume'] 187 self.dut.droid.setMediaVolume(vol) 188 self.media.play() 189 else: 190 vol = self.bt_device_controller.droid.getMaxMediaVolume( 191 ) * self.audio_params['volume'] 192 self.bt_device_controller.droid.setMediaVolume(vol) 193 self.bt_device.previous_track() 194 self.bt_device.play() 195 196 def a2dp_stop(self): 197 198 if hasattr(self, 'dut'): 199 self.media.stop() 200 else: 201 self.bt_device.pause() 202 203 def establish_bt_connection(self): 204 205 if hasattr(self, 'dut'): 206 self.dut.droid.bluetoothFactoryReset() 207 self.bt_device.reset() 208 self.bt_device.power_on() 209 btutils.enable_bluetooth(self.dut.droid, self.dut.ed) 210 connected = btutils.connect_phone_to_headset( 211 self.dut, self.bt_device, 60) 212 vol = self.dut.droid.getMaxMediaVolume( 213 ) * self.audio_params['volume'] 214 self.dut.droid.setMediaVolume(0) 215 time.sleep(1) 216 self.dut.droid.setMediaVolume(int(vol)) 217 self.media.play() 218 return connected 219 220 elif len(self.bt_device_controller.droid. 221 bluetoothA2dpSinkGetConnectedDevices()) == 0: 222 self.log.warning('Need manual intervention to connect BT link') 223 os.system( 224 'spd-say "Please manually connect BT and start playback"') 225 input('Once fixed, please press ENTER to resume the test') 226 return 1 227 228 def run_thdn_analysis(self, audio_captured): 229 """Calculate Total Harmonic Distortion plus Noise for latest recording. 230 231 Args: 232 audio_captured: the captured audio file 233 Returns: 234 thdn: thdn value in a list 235 """ 236 # Calculate Total Harmonic Distortion + Noise 237 audio_result = atu.AudioCaptureResult(audio_captured, 238 self.audio_params) 239 thdn = audio_result.THDN(**self.audio_params['thdn_params']) 240 return thdn 241 242 def record_audio_and_analyze_thdn(self): 243 244 self.a2dp_play() 245 time.sleep(1) 246 self.audio_device.start() 247 time.sleep(self.audio_params['duration']) 248 audio_captured = self.audio_device.stop() 249 audio_result = atu.AudioCaptureResult(audio_captured, 250 self.audio_params) 251 thdn = audio_result.THDN(**self.audio_params['thdn_params']) 252 self.log.info('THDN is {}'.format(thdn[0])) 253 254 self.a2dp_stop() 255 256 return thdn[0] 257 258 def recover_bt_link(self): 259 """Recover BT link during test. 260 261 Recover BT link from the a2dp sink device 262 263 Returns: 264 connected: signal whether bt link is restored 265 """ 266 #Try to connect from the sink device 267 if len(self.bt_device_controller.droid.bluetoothGetConnectedDevices() 268 ) == 0: 269 self.log.warning('Try to recover BT link') 270 self.attenuator.set_atten(self.attenuation_vector['min']) 271 272 if hasattr(self, 'dut'): 273 connected = self.establish_bt_connection() 274 return connected 275 else: 276 device_bonded = self.bt_device_controller.droid.bluetoothGetBondedDevices( 277 )[0]['address'] 278 trial_count = 0 279 trial_limit = 3 280 self.log.info('Try to reconnect from the sink device') 281 while trial_count < trial_limit: 282 #Connect master device from the sink device 283 time_start = time.time() 284 while time.time() < time_start + 5: 285 try: 286 self.bt_device_controller.droid.bluetoothConnectBonded( 287 device_bonded) 288 break 289 except: 290 pass 291 time.sleep(2) 292 if len(self.bt_device_controller.droid. 293 bluetoothA2dpSinkGetConnectedDevices()) > 0: 294 vol = self.bt_device_controller.droid.getMaxMediaVolume( 295 ) * self.audio_params['volume'] 296 self.bt_device_controller.droid.setMediaVolume(0) 297 time.sleep(1) 298 self.bt_device_controller.droid.setMediaVolume( 299 int(vol)) 300 return 1 301 trial_count += 1 302 #Automated reconnect from sink device doesn't work, start fresh connection 303 if trial_count >= trial_limit: 304 self.log.info( 305 'Need manual intervention on the master device side') 306 connected = self.establish_bt_connection() 307 return connected 308 else: 309 return 1 310 311 def find_bt_max_range_bisection_search(self): 312 313 #First linear search to narrow the bisection search 314 atten_min = self.attenuation_vector['min'] 315 atten_max = self.attenuation_vector['max'] 316 atten_step = self.attenuation_vector['step_bisection'] 317 #Start from initial attenuation 318 atten_left = atten_min 319 atten_right = atten_min 320 while atten_left == atten_right and atten_left < atten_max: 321 atten_now = self.attenuator.get_atten() 322 connected = self.recover_bt_link() 323 if connected == 0: 324 self.log.warning("Skip this angle as BT connection failed") 325 max_range = atten_max 326 return max_range 327 else: 328 self.log.info('Connection restored') 329 ramp_attenuation(self.attenuator, atten_now) 330 self.log.info("Attenuation set to {}".format(atten_now)) 331 time.sleep(2) 332 333 thdn = self.record_audio_and_analyze_thdn() 334 if thdn > self.audio_params['thdn_threshold'] or thdn == 0: 335 #Hit the right limit for bisection search 336 if atten_right == atten_min: 337 self.log.warning('Link breaks at the minimum attenuation') 338 max_range = atten_min 339 return max_range 340 else: 341 atten_right = atten_now 342 self.log.info( 343 'Right limit found at {} dB'.format(atten_right)) 344 else: 345 atten_left = atten_now 346 atten_right = atten_left 347 atten_next = min(atten_now + atten_step, atten_max) 348 ramp_attenuation(self.attenuator, atten_next) 349 if atten_left == atten_right: 350 self.log.warning('Could not reach max range') 351 max_range = atten_max 352 return max_range 353 354 #Start the bisection search 355 self.log.info('Start bisection search between {} dB and {} dB'.format( 356 atten_left, atten_right)) 357 while atten_right - atten_left > 1: 358 connected = self.recover_bt_link() 359 if connected == 0: 360 self.log.warning("Skip this angle as BT connection failed") 361 max_range = atten_max 362 return max_range 363 else: 364 self.log.info('Connection restored') 365 366 atten_mid = round((atten_left + atten_right) / 2) 367 ramp_attenuation(self.attenuator, atten_mid) 368 atten_now = self.attenuator.get_atten() 369 self.log.info("Attenuation set to {}".format(atten_now)) 370 time.sleep(5) 371 thdn = self.record_audio_and_analyze_thdn() 372 if thdn > self.audio_params['thdn_threshold'] or thdn == 0: 373 atten_right = atten_mid 374 max_range = atten_right - 1 375 else: 376 atten_left = atten_mid 377 max_range = atten_left 378 self.log.info('Max range reached at {} dB'.format(max_range)) 379 return max_range 380 381 def find_bt_max_range_linear_fine_search(self): 382 383 thdn = 0.03 384 atten_now = self.attenuator.get_atten() 385 386 while thdn < self.audio_params[ 387 'thdn_threshold'] and thdn != 0 and atten_now < self.attenuation_vector[ 388 'max']: 389 atten_now = self.attenuator.get_atten() 390 self.log.info("Attenuation set to {}".format(atten_now)) 391 thdn = self.record_audio_and_analyze_thdn() 392 self.log.info("THDN is {}".format(thdn)) 393 self.attenuator.set_atten(atten_now + 394 self.attenuation_vector['step_fine']) 395 max_range = self.attenuator.get_atten( 396 ) - self.attenuation_vector['step_fine'] * 2 397 if thdn == 0: 398 self.log.warning( 399 "Music play stopped, link might get lost, max range reached at {} dB" 400 .format(max_range)) 401 else: 402 self.log.info("Max range reached at {}".format(max_range)) 403 if atten_now == self.attenuation_vector['max']: 404 self.log.warning("Fail to reach max range") 405 return max_range 406 407 def test_bisection_search_max(self): 408 409 #Find the BT max range under each angle using bisection search 410 max_range_all = [] 411 412 for phi in self.positioner['phi_range']: 413 414 succeed = self.phi_card.moveTo(phi) 415 if succeed: 416 self.log.info("Phi positioner moved to {} degree".format(phi)) 417 else: 418 self.log.warning( 419 "Fail to move phi positioner to {} degree".format(phi)) 420 self.log.info("Phi positioner moved to {} degree".format(phi)) 421 max_ranges = [phi] 422 423 for theta in self.positioner['theta_range']: 424 425 succeed = self.theta_card.moveTo(theta) 426 if succeed: 427 self.log.info( 428 "Theta positioner moved to {} degree".format(theta)) 429 else: 430 self.log.warning( 431 "Failed to move theta positioner to {} degree".format( 432 theta)) 433 self.log.info( 434 "Theta positioner moved to {} degree".format(theta)) 435 436 ramp_attenuation(self.attenuator, 437 self.attenuation_vector['min']) 438 time.sleep(2) 439 max_range = self.find_bt_max_range_bisection_search() 440 max_ranges.append(max_range) 441 max_range_all.append(max_ranges) 442 columns = ['Phi/Theta'] 443 columns.extend(self.positioner['theta_range']) 444 df = pd.DataFrame(max_range_all, columns=columns) 445 df.to_csv(self.file_output, index=False) 446 447 def test_coarse_search(self): 448 449 #Coarse search to find the highest minimum attenuation can be set to 450 #be a starting point for all angles 451 thdn = 0.03 452 max_atten_reached = 0 453 ramp_attenuation(self.attenuator, 454 self.attenuation_vector['start_coarse']) 455 self.log.info('Start attenuation at {} dB'.format( 456 self.attenuator.get_atten())) 457 while True: 458 atten_now = self.attenuator.get_atten() 459 if atten_now == self.attenuation_vector['max']: 460 if max_atten_reached > 1: 461 self.log.warning( 462 'Can not reach to the highest minimum, attenuator is already set to be max, need to add more attenuation' 463 ) 464 break 465 for phi in self.positioner['phi_range']: 466 if thdn == 0 or thdn >= self.audio_params["thdn_threshold"]: 467 break 468 succeed = self.phi_card.moveTo(phi) 469 if succeed: 470 self.log.info( 471 "Phi positioner moved to {} degree".format(phi)) 472 else: 473 self.log.warning( 474 "Fail to move phi positioner to {} degree".format(phi)) 475 self.log.info("Phi positioner moved to {} degree".format(phi)) 476 477 for theta in self.positioner['theta_range']: 478 479 succeed = self.theta_card.moveTo(theta) 480 if succeed: 481 self.log.info( 482 "Theta positioner moved to {} degree".format( 483 theta)) 484 else: 485 self.log.warning( 486 "Failed to move theta positioner to {} degree". 487 format(theta)) 488 self.log.info( 489 "Theta positioner moved to {} degree".format(theta)) 490 491 thdn = self.record_audio_and_analyze_thdn() 492 self.log.info( 493 'THDN at thea {} degree, phi {} degree is {}'.format( 494 theta, phi, thdn)) 495 if thdn == 0 or thdn >= self.audio_params["thdn_threshold"]: 496 break 497 if thdn == 0 or thdn >= self.audio_params["thdn_threshold"]: 498 highest_max = self.attenuator.get_atten( 499 ) - self.attenuation_vector['step_coarse'] 500 self.log.info( 501 'Highest minimum attenuation is {} dB, fine search can start from there' 502 .format(highest_max)) 503 break 504 atten_new = min(atten_now + self.attenuation_vector['step_coarse'], 505 self.attenuation_vector['max']) 506 if atten_new == self.attenuation_vector['max']: 507 max_atten_reached += 1 508 self.attenuator.set_atten(atten_new) 509 self.log.info('\nSetting attenuator to {} dB'.format( 510 self.attenuator.get_atten())) 511 512 def test_finestep_search_max(self): 513 514 #Find the BT max range under each angle with a finer step search 515 max_range_all = [] 516 for phi in self.positioner['phi_range']: 517 518 succeed = self.phi_card.moveTo(phi) 519 if succeed: 520 self.log.info("Phi positioner moved to {} degree".format(phi)) 521 else: 522 self.log.warning( 523 "Fail to move phi positioner to {} degree".format(phi)) 524 self.log.info("Phi positioner moved to {} degree".format(phi)) 525 max_ranges = [phi] 526 527 for theta in self.positioner['theta_range']: 528 529 succeed = self.theta_card.moveTo(theta) 530 if succeed: 531 self.log.info( 532 "Theta positioner moved to {} degree".format(theta)) 533 else: 534 self.log.warning( 535 "Failed to move theta positioner to {} degree".format( 536 theta)) 537 self.log.info( 538 "Theta positioner moved to {} degree".format(theta)) 539 connected = self.recover_bt_link() 540 if connected == 0: 541 self.log.warning("Skip this angle as BT connection failed") 542 max_range = self.attenuation_vector['max'] 543 return max_range 544 else: 545 self.log.info('Connection restored') 546 ramp_attenuation(self.attenuator, 547 self.attenuation_vector['start_fine']) 548 max_range = self.find_bt_max_range_linear_fine_search() 549 max_ranges.append(max_range) 550 max_range_all.append(max_ranges) 551 columns = ['Phi/Theta'] 552 columns.extend(self.positioner['theta_range']) 553 df_range = pd.DataFrame(max_range_all, columns=columns) 554 df_range.to_csv(self.file_output, index=False) 555