• Home
  • History
  • Annotate
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4Created on Thu Jul 16 22:58:03 2020
5
6@author: qijiang
7"""
8
9import os
10import pyvisa
11import time
12import acts_contrib.test_utils.coex.audio_test_utils as atu
13import acts_contrib.test_utils.bt.bt_test_utils as btutils
14import pandas as pd
15from acts import asserts
16from acts_contrib.test_utils.abstract_devices.bluetooth_handsfree_abstract_device import BluetoothHandsfreeAbstractDeviceFactory as bt_factory
17from acts_contrib.test_utils.bt.A2dpBaseTest import A2dpBaseTest
18from acts_contrib.test_utils.power.PowerBTBaseTest import ramp_attenuation
19
20PHONE_MUSIC_FILE_DIRECTORY = '/sdcard/Music'
21
22
23class RPIAxis(object):
24    def __init__(self, VisaConnectString):
25        """Constructor.
26        Create a Visa connection
27
28        """
29        rm = pyvisa.ResourceManager()
30        self.instrument = rm.open_resource(VisaConnectString)
31        self.instrument.read_termination = "\n"  # make sure we look for newline at the end of strings we read
32
33    def __getattr__(self, attr):
34        return getattr(self.instrument, attr)  # Delegate all other attrs
35
36
37class RPIAxis_card(RPIAxis):
38    """ RPIAxis_card()
39        Create an axis
40
41    """
42    def __init__(self, axis_object):
43        # create an object to communicate to an RPI2 (remote positioner instrument) axis
44        self.axis = axis_object  # store pyvisa instrument connection
45
46    def __getattr__(self, attr):
47        return getattr(self.axis, attr)  # Delegate all other attrs
48
49    def moveTo(self, where):
50        """ moveTo
51        move to a given position and make sure you arrived at the target.
52        """
53        # max travale time in seconds. adjust this if you have a really slow positioner!
54        MAXTRAVELTIME = 150
55        t0 = time.time()
56        self.axis.write("SK %d\n" % where)
57        done = False
58        while (not done):
59            if (time.time() - t0) > MAXTRAVELTIME:
60                print("looks like we are stuck!\n")
61                return False
62            response = self.axis.query("*opc?\n")
63            if (response == '1'):
64                return True
65            else:
66                response = self.axis.query("CP?\n")
67
68    # stop the positioner
69    def Stop(self):
70        t0 = time.time()
71        done = False
72        self.axis.write("ST\n")
73        while (not done):
74            if (time.time() - t0) > 2:
75                print("Runaway positioner!\n")
76                return False
77            response = self.axis.query("*opc?\n")
78            if (response == '1'):
79                return True
80
81    # set continuous rotation mode
82    def SetContinuousRotationMode(self):
83        self.axis.write("CR\n")
84
85    # set non continuous rotation mode
86    def SetNonContinuousRotationMode(self):
87        self.axis.write("NCR\n")
88
89
90class BtA2dpOtaRangeTest(A2dpBaseTest):
91    def setup_class(self):
92
93        #'audio_params' is a dict, contains the audio device type, audio streaming
94        #settings such as volume, duration, audio recording parameters such as
95        #channel, sampling rate/width, and thdn parameters for audio processing
96        req_params = [
97            'audio_params', 'positioner', 'dut_config', 'attenuation_vector'
98        ]
99        opt_params = ['music_files']
100        self.unpack_userparams(req_params)
101        if len(self.android_devices) > 1:
102            self.dut = self.android_devices[1]
103            self.unpack_userparams(opt_params)
104            music_src = self.music_files[0]
105            music_dest = PHONE_MUSIC_FILE_DIRECTORY
106            success = self.dut.push_system_file(music_src, music_dest)
107            if success:
108                self.music_file = os.path.join(PHONE_MUSIC_FILE_DIRECTORY,
109                                               os.path.basename(music_src))
110            # Initialize media_control class
111            self.media = btutils.MediaControlOverSl4a(self.dut,
112                                                      self.music_file)
113        # Set attenuator to minimum attenuation
114        self.attenuator = self.attenuators[0]
115        self.attenuator.set_atten(self.attenuation_vector['min'])
116        # Create the BTOE(Bluetooth-Other-End) device object
117        bt_devices = self.user_params.get('bt_devices', [])
118        attr, idx = bt_devices.split(':')
119        self.bt_device_controller = getattr(self, attr)[int(idx)]
120        self.bt_device = bt_factory().generate(self.bt_device_controller)
121        btutils.enable_bqr(self.bt_device_controller)
122
123        #Setup positioner
124        self.PhiAxisAddress = "TCPIP0::{}::{}::SOCKET".format(
125            self.positioner["server_ip"], self.positioner["phi_axis_port"])
126        self.ThetaAxisAddress = "TCPIP0::{}::{}::SOCKET".format(
127            self.positioner["server_ip"], self.positioner["theta_axis_port"])
128        self.phi_axis = RPIAxis(self.PhiAxisAddress)
129        self.phi_card = RPIAxis_card(self.phi_axis)
130        self.log.info("*IDN? response: {}".format(
131            self.phi_card.query("*idn?\n")))
132        self.theta_axis = RPIAxis(self.ThetaAxisAddress)
133        self.theta_card = RPIAxis_card(self.theta_axis)
134        self.log.info("*IDN? response: {}".format(
135            self.theta_card.query("*idn?\n")))
136        self.phi_card.Stop()
137        self.theta_card.Stop()
138
139    def teardown_class(self):
140
141        if hasattr(self, 'media'):
142            self.media.stop()
143        if hasattr(self, 'attenuator'):
144            self.attenuator.set_atten(self.attenuation_vector['min'])
145        if hasattr(self, 'dut'):
146            self.dut.droid.bluetoothFactoryReset()
147            btutils.disable_bluetooth(self.dut.droid)
148        self.bt_device.reset()
149        self.bt_device.power_off()
150        self.phi_card.moveTo(0)
151        self.theta_card.moveTo(0)
152
153    def setup_test(self):
154
155        # Reset headset
156        self.bt_device.reset()
157        # Initialize audio capture devices
158        self.audio_device = atu.get_audio_capture_device(
159            self.bt_device_controller, self.audio_params)
160        # Connect BT link
161        connected = self.establish_bt_connection()
162        asserts.assert_true(connected, 'BT connection failed')
163        # Output file
164        file_name = 'OTA_Range_Over_Angle_{}_{}.csv'.format(
165            self.dut_config['model'], self.dut_config['screen_placement'])
166        self.file_output = os.path.join(self.log_path, file_name)
167
168    def teardown_test(self):
169
170        if hasattr(self, 'media'):
171            self.media.stop()
172        if hasattr(self, 'attenuator'):
173            self.attenuator.set_atten(self.attenuation_vector['min'])
174        if hasattr(self, 'dut'):
175            self.dut.droid.bluetoothFactoryReset()
176            btutils.disable_bluetooth(self.dut.droid)
177        self.bt_device.reset()
178        self.bt_device.power_off()
179        self.phi_card.moveTo(0)
180        self.theta_card.moveTo(0)
181
182    def a2dp_play(self):
183
184        if hasattr(self, 'dut'):
185            vol = self.dut.droid.getMaxMediaVolume(
186            ) * self.audio_params['volume']
187            self.dut.droid.setMediaVolume(vol)
188            self.media.play()
189        else:
190            vol = self.bt_device_controller.droid.getMaxMediaVolume(
191            ) * self.audio_params['volume']
192            self.bt_device_controller.droid.setMediaVolume(vol)
193            self.bt_device.previous_track()
194            self.bt_device.play()
195
196    def a2dp_stop(self):
197
198        if hasattr(self, 'dut'):
199            self.media.stop()
200        else:
201            self.bt_device.pause()
202
203    def establish_bt_connection(self):
204
205        if hasattr(self, 'dut'):
206            self.dut.droid.bluetoothFactoryReset()
207            self.bt_device.reset()
208            self.bt_device.power_on()
209            btutils.enable_bluetooth(self.dut.droid, self.dut.ed)
210            connected = btutils.connect_phone_to_headset(
211                self.dut, self.bt_device, 60)
212            vol = self.dut.droid.getMaxMediaVolume(
213            ) * self.audio_params['volume']
214            self.dut.droid.setMediaVolume(0)
215            time.sleep(1)
216            self.dut.droid.setMediaVolume(int(vol))
217            self.media.play()
218            return connected
219
220        elif len(self.bt_device_controller.droid.
221                 bluetoothA2dpSinkGetConnectedDevices()) == 0:
222            self.log.warning('Need manual intervention to connect BT link')
223            os.system(
224                'spd-say "Please manually connect BT and start playback"')
225            input('Once fixed, please press ENTER to resume the test')
226            return 1
227
228    def run_thdn_analysis(self, audio_captured):
229        """Calculate Total Harmonic Distortion plus Noise for latest recording.
230
231        Args:
232            audio_captured: the captured audio file
233        Returns:
234            thdn: thdn value in a list
235        """
236        # Calculate Total Harmonic Distortion + Noise
237        audio_result = atu.AudioCaptureResult(audio_captured,
238                                              self.audio_params)
239        thdn = audio_result.THDN(**self.audio_params['thdn_params'])
240        return thdn
241
242    def record_audio_and_analyze_thdn(self):
243
244        self.a2dp_play()
245        time.sleep(1)
246        self.audio_device.start()
247        time.sleep(self.audio_params['duration'])
248        audio_captured = self.audio_device.stop()
249        audio_result = atu.AudioCaptureResult(audio_captured,
250                                              self.audio_params)
251        thdn = audio_result.THDN(**self.audio_params['thdn_params'])
252        self.log.info('THDN is {}'.format(thdn[0]))
253
254        self.a2dp_stop()
255
256        return thdn[0]
257
258    def recover_bt_link(self):
259        """Recover BT link during test.
260
261        Recover BT link from the a2dp sink device
262
263        Returns:
264            connected: signal whether bt link is restored
265        """
266        #Try to connect from the sink device
267        if len(self.bt_device_controller.droid.bluetoothGetConnectedDevices()
268               ) == 0:
269            self.log.warning('Try to recover BT link')
270            self.attenuator.set_atten(self.attenuation_vector['min'])
271
272            if hasattr(self, 'dut'):
273                connected = self.establish_bt_connection()
274                return connected
275            else:
276                device_bonded = self.bt_device_controller.droid.bluetoothGetBondedDevices(
277                )[0]['address']
278                trial_count = 0
279                trial_limit = 3
280                self.log.info('Try to reconnect from the sink device')
281                while trial_count < trial_limit:
282                    #Connect master device from the sink device
283                    time_start = time.time()
284                    while time.time() < time_start + 5:
285                        try:
286                            self.bt_device_controller.droid.bluetoothConnectBonded(
287                                device_bonded)
288                            break
289                        except:
290                            pass
291                    time.sleep(2)
292                    if len(self.bt_device_controller.droid.
293                           bluetoothA2dpSinkGetConnectedDevices()) > 0:
294                        vol = self.bt_device_controller.droid.getMaxMediaVolume(
295                        ) * self.audio_params['volume']
296                        self.bt_device_controller.droid.setMediaVolume(0)
297                        time.sleep(1)
298                        self.bt_device_controller.droid.setMediaVolume(
299                            int(vol))
300                        return 1
301                    trial_count += 1
302                #Automated reconnect from sink device doesn't work, start fresh connection
303                if trial_count >= trial_limit:
304                    self.log.info(
305                        'Need manual intervention on the master device side')
306                    connected = self.establish_bt_connection()
307                    return connected
308        else:
309            return 1
310
311    def find_bt_max_range_bisection_search(self):
312
313        #First linear search to narrow the bisection search
314        atten_min = self.attenuation_vector['min']
315        atten_max = self.attenuation_vector['max']
316        atten_step = self.attenuation_vector['step_bisection']
317        #Start from initial attenuation
318        atten_left = atten_min
319        atten_right = atten_min
320        while atten_left == atten_right and atten_left < atten_max:
321            atten_now = self.attenuator.get_atten()
322            connected = self.recover_bt_link()
323            if connected == 0:
324                self.log.warning("Skip this angle as BT connection failed")
325                max_range = atten_max
326                return max_range
327            else:
328                self.log.info('Connection restored')
329            ramp_attenuation(self.attenuator, atten_now)
330            self.log.info("Attenuation set to {}".format(atten_now))
331            time.sleep(2)
332
333            thdn = self.record_audio_and_analyze_thdn()
334            if thdn > self.audio_params['thdn_threshold'] or thdn == 0:
335                #Hit the right limit for bisection search
336                if atten_right == atten_min:
337                    self.log.warning('Link breaks at the minimum attenuation')
338                    max_range = atten_min
339                    return max_range
340                else:
341                    atten_right = atten_now
342                    self.log.info(
343                        'Right limit found at {} dB'.format(atten_right))
344            else:
345                atten_left = atten_now
346                atten_right = atten_left
347                atten_next = min(atten_now + atten_step, atten_max)
348                ramp_attenuation(self.attenuator, atten_next)
349        if atten_left == atten_right:
350            self.log.warning('Could not reach max range')
351            max_range = atten_max
352            return max_range
353
354        #Start the bisection search
355        self.log.info('Start bisection search between {} dB and {} dB'.format(
356            atten_left, atten_right))
357        while atten_right - atten_left > 1:
358            connected = self.recover_bt_link()
359            if connected == 0:
360                self.log.warning("Skip this angle as BT connection failed")
361                max_range = atten_max
362                return max_range
363            else:
364                self.log.info('Connection restored')
365
366            atten_mid = round((atten_left + atten_right) / 2)
367            ramp_attenuation(self.attenuator, atten_mid)
368            atten_now = self.attenuator.get_atten()
369            self.log.info("Attenuation set to {}".format(atten_now))
370            time.sleep(5)
371            thdn = self.record_audio_and_analyze_thdn()
372            if thdn > self.audio_params['thdn_threshold'] or thdn == 0:
373                atten_right = atten_mid
374                max_range = atten_right - 1
375            else:
376                atten_left = atten_mid
377                max_range = atten_left
378        self.log.info('Max range reached at {} dB'.format(max_range))
379        return max_range
380
381    def find_bt_max_range_linear_fine_search(self):
382
383        thdn = 0.03
384        atten_now = self.attenuator.get_atten()
385
386        while thdn < self.audio_params[
387                'thdn_threshold'] and thdn != 0 and atten_now < self.attenuation_vector[
388                    'max']:
389            atten_now = self.attenuator.get_atten()
390            self.log.info("Attenuation set to {}".format(atten_now))
391            thdn = self.record_audio_and_analyze_thdn()
392            self.log.info("THDN is {}".format(thdn))
393            self.attenuator.set_atten(atten_now +
394                                      self.attenuation_vector['step_fine'])
395        max_range = self.attenuator.get_atten(
396        ) - self.attenuation_vector['step_fine'] * 2
397        if thdn == 0:
398            self.log.warning(
399                "Music play stopped, link might get lost, max range reached at {} dB"
400                .format(max_range))
401        else:
402            self.log.info("Max range reached at {}".format(max_range))
403        if atten_now == self.attenuation_vector['max']:
404            self.log.warning("Fail to reach max range")
405        return max_range
406
407    def test_bisection_search_max(self):
408
409        #Find the BT max range under each angle using bisection search
410        max_range_all = []
411
412        for phi in self.positioner['phi_range']:
413
414            succeed = self.phi_card.moveTo(phi)
415            if succeed:
416                self.log.info("Phi positioner moved to {} degree".format(phi))
417            else:
418                self.log.warning(
419                    "Fail to move phi positioner to {} degree".format(phi))
420            self.log.info("Phi positioner moved to {} degree".format(phi))
421            max_ranges = [phi]
422
423            for theta in self.positioner['theta_range']:
424
425                succeed = self.theta_card.moveTo(theta)
426                if succeed:
427                    self.log.info(
428                        "Theta positioner moved to {} degree".format(theta))
429                else:
430                    self.log.warning(
431                        "Failed to move theta positioner to {} degree".format(
432                            theta))
433                self.log.info(
434                    "Theta positioner moved to {} degree".format(theta))
435
436                ramp_attenuation(self.attenuator,
437                                 self.attenuation_vector['min'])
438                time.sleep(2)
439                max_range = self.find_bt_max_range_bisection_search()
440                max_ranges.append(max_range)
441            max_range_all.append(max_ranges)
442        columns = ['Phi/Theta']
443        columns.extend(self.positioner['theta_range'])
444        df = pd.DataFrame(max_range_all, columns=columns)
445        df.to_csv(self.file_output, index=False)
446
447    def test_coarse_search(self):
448
449        #Coarse search to find the highest minimum attenuation can be set to
450        #be a starting point for all angles
451        thdn = 0.03
452        max_atten_reached = 0
453        ramp_attenuation(self.attenuator,
454                         self.attenuation_vector['start_coarse'])
455        self.log.info('Start attenuation at {} dB'.format(
456            self.attenuator.get_atten()))
457        while True:
458            atten_now = self.attenuator.get_atten()
459            if atten_now == self.attenuation_vector['max']:
460                if max_atten_reached > 1:
461                    self.log.warning(
462                        'Can not reach to the highest minimum, attenuator is already set to be max, need to add more attenuation'
463                    )
464                    break
465            for phi in self.positioner['phi_range']:
466                if thdn == 0 or thdn >= self.audio_params["thdn_threshold"]:
467                    break
468                succeed = self.phi_card.moveTo(phi)
469                if succeed:
470                    self.log.info(
471                        "Phi positioner moved to {} degree".format(phi))
472                else:
473                    self.log.warning(
474                        "Fail to move phi positioner to {} degree".format(phi))
475                self.log.info("Phi positioner moved to {} degree".format(phi))
476
477                for theta in self.positioner['theta_range']:
478
479                    succeed = self.theta_card.moveTo(theta)
480                    if succeed:
481                        self.log.info(
482                            "Theta positioner moved to {} degree".format(
483                                theta))
484                    else:
485                        self.log.warning(
486                            "Failed to move theta positioner to {} degree".
487                            format(theta))
488                    self.log.info(
489                        "Theta positioner moved to {} degree".format(theta))
490
491                    thdn = self.record_audio_and_analyze_thdn()
492                    self.log.info(
493                        'THDN at thea {} degree, phi {} degree is {}'.format(
494                            theta, phi, thdn))
495                    if thdn == 0 or thdn >= self.audio_params["thdn_threshold"]:
496                        break
497            if thdn == 0 or thdn >= self.audio_params["thdn_threshold"]:
498                highest_max = self.attenuator.get_atten(
499                ) - self.attenuation_vector['step_coarse']
500                self.log.info(
501                    'Highest minimum attenuation is {} dB, fine search can start from there'
502                    .format(highest_max))
503                break
504            atten_new = min(atten_now + self.attenuation_vector['step_coarse'],
505                            self.attenuation_vector['max'])
506            if atten_new == self.attenuation_vector['max']:
507                max_atten_reached += 1
508            self.attenuator.set_atten(atten_new)
509            self.log.info('\nSetting attenuator to {} dB'.format(
510                self.attenuator.get_atten()))
511
512    def test_finestep_search_max(self):
513
514        #Find the BT max range under each angle with a finer step search
515        max_range_all = []
516        for phi in self.positioner['phi_range']:
517
518            succeed = self.phi_card.moveTo(phi)
519            if succeed:
520                self.log.info("Phi positioner moved to {} degree".format(phi))
521            else:
522                self.log.warning(
523                    "Fail to move phi positioner to {} degree".format(phi))
524            self.log.info("Phi positioner moved to {} degree".format(phi))
525            max_ranges = [phi]
526
527            for theta in self.positioner['theta_range']:
528
529                succeed = self.theta_card.moveTo(theta)
530                if succeed:
531                    self.log.info(
532                        "Theta positioner moved to {} degree".format(theta))
533                else:
534                    self.log.warning(
535                        "Failed to move theta positioner to {} degree".format(
536                            theta))
537                self.log.info(
538                    "Theta positioner moved to {} degree".format(theta))
539                connected = self.recover_bt_link()
540                if connected == 0:
541                    self.log.warning("Skip this angle as BT connection failed")
542                    max_range = self.attenuation_vector['max']
543                    return max_range
544                else:
545                    self.log.info('Connection restored')
546                ramp_attenuation(self.attenuator,
547                                 self.attenuation_vector['start_fine'])
548                max_range = self.find_bt_max_range_linear_fine_search()
549                max_ranges.append(max_range)
550            max_range_all.append(max_ranges)
551        columns = ['Phi/Theta']
552        columns.extend(self.positioner['theta_range'])
553        df_range = pd.DataFrame(max_range_all, columns=columns)
554        df_range.to_csv(self.file_output, index=False)
555