1#!/usr/bin/env python3
3#   Copyright 2018 - The Android Open Source Project
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
9#       http://www.apache.org/licenses/LICENSE-2.0
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
17A comprehensive interface for performing test actions on an Apollo device.
20import time
22from acts.controllers.android_lib.tel.tel_utils import initiate_call
23from acts.controllers.android_lib.tel.tel_utils import wait_for_droid_in_call
24from acts.controllers.buds_lib.apollo_lib import DeviceError
25from acts.controllers.buds_lib.test_actions.agsa_acts import AgsaOTAError
26from acts.controllers.buds_lib.test_actions.base_test_actions import BaseTestAction
27from acts.controllers.buds_lib.test_actions.base_test_actions import timed_action
28from acts.controllers.buds_lib.test_actions.bt_utils import BTUtils
29from acts.libs.utils.timer import TimeRecorder
30from acts.utils import wait_until
32PACKAGE_NAME_AGSA = 'com.google.android.googlequicksearchbox'
33PACKAGE_NAME_GMS = 'com.google.android.gms'
34PACKAGE_NAME_NEARBY = 'com.google.android.gms.policy_nearby'
35PACKAGE_NAME_SETTINGS = 'com.android.settings'
41BISTO_MP_CONNECT_FAIL_TEXT = 'Can\'t connect to'
43BISTO_MP_CONNECTED_TEXT = 'Now set up your Google Assistant'
48    'HFP(pri.)': 'TRUE',
49    'A2DP(pri)': 'TRUE',
52    'HFP(pri.)': 'FALSE',
53    'A2DP(pri)': 'FALSE',
57AVRCPSTATUS = 'AvrcpPlayPause'
58DEFAULT_TIMEOUT = 60  # wait 60 seconds max for bond/connect.
59DEFAULT_CMD_INTERVAL = 0.5  # default interval between serial commands
60DEFAULT_CMD_RETRY = 5  # default retry times when a command failed.
62    'HFP Pri', 'HFP Sec', 'A2DP Pri', 'A2DP Sec', 'CTRL', 'AUDIO', 'DEBUG',
63    'TRANS'
65DEFAULT_BT_STATUS = ['A2DP(pri)', 'HFP(pri.)', 'Comp']
68class TestActsError(Exception):
69    """Exception from Apollo Acts Error."""
72class ApolloTestActions(BaseTestAction):
73    """Test action class for all Apollo test actions."""
75    def __init__(self, apollo_dev, logger=None):
76        """
77        Args:
78             apollo_dev: apollo.lib.apollo_lib.Device the Apollo device
79        """
80        super(ApolloTestActions, self).__init__(logger)
81        self.dut = apollo_dev
82        # need a embedded timer for connection time measurements.
83        self.measurement_timer = TimeRecorder()
85    def bluetooth_get_status(self):
86        status = self.dut.get_bt_status()
87        self.logger.info(status)
89    def wait_for_bluetooth_disconnection(self, timeout=60):
90        """ Set pairing mode and disconnect.
92        This action will wait until the apollo profiles are false.
94        Args:
95             timeout: integer, timeout value in seconds.
96        """
97        result = True
98        apollo_status = self.dut.get_bt_status()
99        self.logger.info('Waiting for the disconnection.')
100        time.sleep(1)
101        ini_time = time.time()
102        while len(apollo_status) != len(
103            [s for s in apollo_status.values() if s == 'FALSE']):
104            apollo_status = self.dut.get_bt_status()
105            if (time.time() - ini_time) > timeout:
106                self.logger.warning('Timeout waiting for the disconnection.')
107                return False
108            time.sleep(1)
109        return result
111    def pair(self, phone, companion_app=True):
112        """Pairs phone with apollo and validates bluetooth profiles.
114        Args:
115            phone: android phone
116            apollo: apollo device
117            companion_app (optional): True if the phone has a companion app
118                                      installed. False otherwise.
120        Raises:
121            TestActsError: Bluetooth pairing failed/ Dut BT status check failed.
122        """
123        bt_util = BTUtils()
124        target_addr = self.dut.bluetooth_address
125        if bt_util.android_device_in_connected_state(phone, target_addr):
126            self.logger.info('Already paired and connected, skipping pairing.')
127        else:
128            if bt_util.android_device_in_paired_state(phone, target_addr):
129                self.logger.info(
130                    'Device is paired but not connected, unpair first.')
131                if not bt_util.bt_unpair(phone, self.dut):
132                    raise TestActsError('Unable to unpair the device')
133            bt_util.bt_pair_and_connect(phone, self.dut)
134            self.logger.info('DEVICE PAIRED')
135            if companion_app:
136                profiles = PROFILES_CONNECTED.copy()
137                profiles.update(COMP_PROFILE_CONNECTED)
138            else:
139                profiles = PROFILES_CONNECTED
140            self.logger.info(profiles)
141            if not bt_util.check_device_bt(device=self.dut, profiles=profiles):
142                raise TestActsError('Dut BT status check failed.')
143            else:
144                return True
146    def unpair(self, phone, companion_app=True, factory_reset_dut=True):
147        """Unpairs phone from apollo and validates bluetooth profiles.
149        Args:
150            phone: android phone
151            apollo: apollo device
152            companion_app (optional): True if the phone has a companion app
153                                      installed. False otherwise.
155        Raises:
156            TestActsError: Bluetooth unpairing/Dut BT status check failed.
157        """
158        bt_util = BTUtils()
159        target_addr = self.dut.bluetooth_address
160        if not bt_util.android_device_in_paired_state(phone, target_addr):
161            self.logger.info('Device is already unpaired, skipping unpairing.')
162        else:
163            result = bt_util.bt_unpair(
164                phone, self.dut, factory_reset_dut=factory_reset_dut)
165            if not result:
166                raise TestActsError('Bluetooth unpairing failed.')
167            if companion_app:
168                profiles = PROFILES_DISCONNECTED.copy()
169                profiles.update(COMP_PROFILE_DISCONNECTED)
170            else:
171                profiles = PROFILES_DISCONNECTED
172            if not bt_util.check_device_bt(device=self.dut, profiles=profiles):
173                raise TestActsError('Dut BT status check failed.')
174            else:
175                return True
177    def is_paired(self, phone):
178        """Check if the given apollo is paired with the android device.
180        Args:
181            phone: android phone
182            apollo: apollo device
184        Returns:
185            Bool: True if apollo is paired with the phone.
186        """
187        bt_util = BTUtils()
188        target_addr = self.dut.bluetooth_address
189        return bt_util.android_device_in_paired_state(phone, target_addr)
191    def send_music_play_event_and_validate(self):
192        """Send the play event on Apollo and validate the response and DSP
193        Status.
195        Raises:
196            TestActsError: Error while playing the music.
197        """
198        play_detection_timeout = 1
199        if self.dut.is_streaming():
200            self.logger.info('Music already streaming. Skipping play event..')
201            return
202        self.logger.info('Playing video...')
203        is_played = self.dut.music_control_events(
204            AVRCPSTATUS, self.dut.apollo_log_regex.AVRCP_PLAY_REGEX)
205        if not is_played:
206            self.logger.error('AVRCP Played status not found')
207            raise TestActsError('AVRCP Played status not found.')
208        wait_until(
209            lambda: self.dut.is_streaming(),
210            play_detection_timeout,
211            sleep_s=0.25)
212        if not self.dut.is_streaming():
213            self.logger.error('Device is NOT in a deviceA2DPStreaming state')
214            raise TestActsError(
215                'Device is NOT in a deviceA2DPStreaming state.')
217    def send_music_pause_event_and_validate(self):
218        """Send the pause event on Apollo and validate the responses and DSP
219        Status.
221        Raises:
222            TestActsError: Error while pausing the music.
223        """
224        paused_detection_timeout = 10
225        if not self.dut.is_streaming():
226            self.logger.info('Music not streaming. Skipping pause event..')
227            return
228        self.logger.info("Pausing video...")
229        is_paused = self.dut.music_control_events(
230            AVRCPSTATUS, self.dut.apollo_log_regex.AVRCP_PAUSE_REGEX)
231        if not is_paused:
232            self.logger.error('AVRCP Paused statue not found')
233            raise TestActsError('AVRCP Paused status not found.')
234        wait_until(
235            lambda: not self.dut.is_streaming(),
236            paused_detection_timeout,
237            sleep_s=0.25)
238        if self.dut.is_streaming():
239            self.logger.error('Device is still in deviceA2DPStreaming state')
240            raise TestActsError(
241                'Device is still in deviceA2DPStreaming state.')
243    def vol_down_and_validate(self):
244        """Send volume down twice and validate by comparing two levels
246        Raises:
247            TestActsError: Error
248        """
249        self.logger.info('Decreasing volume')
250        before_vol = self.dut.volume('Down', 1)
251        time.sleep(2)
252        after_vol = self.dut.volume('Down', 1)
253        if not after_vol or not before_vol or after_vol >= before_vol:
254            self.logger.error(
255                'Unable to decrease the volume. Before: %s. After: %s' %
256                (before_vol, after_vol))
257            raise TestActsError('error decreasing volume')
259    def vol_up_and_validate(self):
260        """Send volume up twice and validate by comparing two levels
262        Raises:
263            TestActsError: Error
264        """
265        self.logger.info('Increasing volume')
266        before_vol = self.dut.volume('Up', 1)
267        time.sleep(2)
268        after_vol = self.dut.volume('Up', 1)
269        if not after_vol or not before_vol or after_vol <= before_vol:
270            self.logger.error(
271                'Unable to increase the volume. Before: %s. After: %s' %
272                (before_vol, after_vol))
273            raise TestActsError('error increasing volume')
275    def call_and_validate_ringing(self,
276                                  calling_phone,
277                                  number_to_call,
278                                  call_retries=10):
279        for i in range(call_retries):
280            initiate_call(self.logger, calling_phone, number_to_call)
281            is_calling = wait_for_droid_in_call(
282                self.logger, calling_phone, max_time=10)
283            if is_calling:
284                self.logger.info('Call initiated!')
285                break
286            else:
287                self.logger.warning('Call is not initiating.')
288                if i == call_retries:
289                    self.logger.error('Call initiation retries exhausted')
290                    raise TestActsError(
291                        '%s retries failed to initiate the call' %
292                        (call_retries))
293            self.logger.warning('Retrying call...')
294        # wait for offhook state and return
295        wait_until(
296            (lambda: calling_phone.droid.telecomGetCallState() == 'OFFHOOK'),
297            timeout_s=40,
298            condition=True,
299            sleep_s=.5)
300        self.logger.info('Phone call initiated on %s' % calling_phone.serial)
302    def answer_phone_and_validate_call_received(self, receiving_phone):
303        # wait until the phone rings (assumes that a call is initiated prior to
304        # running the command)
305        wait_until(
306            lambda: receiving_phone.droid.telecomGetCallState() == 'RINGING',
307            timeout_s=40,
308            condition=True,
309            sleep_s=.5)
310        self.logger.info('Ring detected on %s - now answering the call...' %
311                         (receiving_phone.serial))
312        # answer the phone call
313        self.dut.tap()
314        # wait until OFFHOOK state
315        wait_until(
316            lambda: receiving_phone.droid.telecomGetCallState() == 'OFFHOOK',
317            timeout_s=40,
318            condition=True,
319            sleep_s=.5)
321    def hangup_phone_and_validate_call_hung(self, receiving_phone):
322        # wait for phone to be in OFFHOOK state (assumed that a call is answered
323        # and engaged)
324        wait_until(
325            lambda: receiving_phone.droid.telecomGetCallState() == 'OFFHOOK',
326            timeout_s=40,
327            condition=True,
328            sleep_s=.5)
329        # end the call (post and pre 1663 have different way of ending call)
330        self.logger.info(
331            'Hanging up the call on %s...' % receiving_phone.serial)
332        if self.dut.version < 1663:
333            self.dut.tap()
334        else:
335            self.dut.hold(duration=100)
336        # wait for idle state
337        wait_until(
338            lambda: receiving_phone.droid.telecomGetCallState() == 'IDLE',
339            timeout_s=40,
340            condition=True,
341            sleep_s=.5)
343    @timed_action
344    def factory_reset(self):
345        ret = False
346        try:
347            self.dut.factory_reset()
348            ret = True
349        except DeviceError as ex:
350            self.logger.warning('Failed to reset Apollo: %s' % ex)
351        return ret
353    @timed_action
354    def wait_for_magic_pairing_notification(self, android_act, timeout=60):
355        dut_detected = False
356        start_time = time.time()
357        self.logger.info('Waiting for MP prompt: %s' % BISTO_MP_DEVICE_TEXT)
358        while not dut_detected:
359            android_act.dut.ui_util.uia.wait.update()
360            self.sleep(1)
361            if android_act.dut.ui_util.uia(
362                    textContains=BISTO_MP_DETECT_HEADER, enabled=True).exists:
363                if android_act.dut.ui_util.uia(
364                        textContains=BISTO_MP_DEVICE_TEXT,
365                        enabled=True).exists:
366                    self.logger.info('DUT Apollo MP prompt detected!')
367                    dut_detected = True
368                else:
369                    self.logger.info(
370                        'NONE DUT Apollo MP prompt detected! Cancel and RETRY!'
371                    )
372                    android_act.dut.ui_util.click_by_text(BISTO_MP_CANCEL_TEXT)
373            if time.time() - start_time > timeout:
374                break
375        if not dut_detected:
376            self.logger.info(
377                'Failed to get %s MP prompt' % BISTO_MP_DEVICE_TEXT)
378        return dut_detected
380    @timed_action
381    def start_magic_pairing(self, android_act, timeout=30, retries=3):
382        paired = False
383        android_act.dut.ui_util.click_by_text(
384            BISTO_MP_CONNECT_TEXT, timeout=timeout)
385        connect_start_time = time.time()
386        count = 0
387        timeout = 30
389        while not paired and count < retries:
390            android_act.dut.ui_util.uia.wait.update()
391            self.sleep(1)
392            if time.time() - connect_start_time > timeout:
393                self.logger.info('Time out! %s seconds' % time)
394                android_act.app_force_close_agsa()
395                self.logger.info('Timeout(s): %s' % timeout)
396                break
397            if android_act.dut.ui_util.uia(
398                    textContains=BISTO_MP_CONNECT_FAIL_TEXT,
399                    enabled=True).exists:
400                count += 1
401                self.logger.info('MP FAILED! Retry %s.' % count)
402                android_act.dut.ui_util.click_by_text(
403                    BISTO_MP_CONNECT_RETRY_TEXT)
404                connect_start_time = time.time()
405            elif android_act.dut.ui_util.uia(
406                    textContains=BISTO_MP_CONNECTED_TEXT, enabled=True).exists:
407                self.logger.info('MP SUCCESSFUL! Exiting AGSA...')
408                paired = True
409                android_act.dut.ui_util.click_by_text(
410                    BISTO_MP_CONNECTED_EXIT_TEXT)
411                android_act.dut.ui_util.wait_for_text(
412                    BISTO_MP_EXIT_PROMPT_TEXT)
413                android_act.dut.ui_util.click_by_text(
414                    BISTO_MP_EXIT_CONFIRM_TEXT)
415        return paired
417    @timed_action
418    def turn_bluetooth_on(self):
419        self.dut.cmd('pow 1')
420        return True
422    @timed_action
423    def turn_bluetooth_off(self):
424        self.dut.cmd('pow 0')
425        return True
427    @timed_action
428    def wait_for_bluetooth_a2dp_hfp(self,
429                                    timeout=DEFAULT_TIMEOUT,
430                                    interval=DEFAULT_CMD_INTERVAL):
431        """Wait for BT connection by checking if A2DP and HFP connected.
433        This is used for BT pair+connect test.
435        Args:
436            timeout: float, timeout value in second.
437            interval: float, float, interval between polling BT profiles.
438            timer: TimeRecorder, time recorder to save the connection time.
439        """
440        # Need to check these two profiles
441        pass_profiles = ['A2DP Pri', 'HFP Pri']
442        # TODO(b/122730302): Change to just raise an error
443        ret = False
444        try:
445            ret = self._wait_for_bluetooth_profile_connection(
446                pass_profiles, timeout, interval, self.measurement_timer)
447        except DeviceError as ex:
448            self.logger.warning('Failed to wait for BT connection: %s' % ex)
449        return ret
451    def _wait_for_bluetooth_profile_connection(self, profiles_to_check,
452                                               timeout, interval, timer):
453        """A generic method to wait for specified BT profile connection.
455        Args:
456            profiles_to_check: list, profile names (A2DP, HFP, etc.) to be
457                               checked.
458            timeout: float, timeout value in second.
459            interval: float, interval between polling BT profiles.
460            timer: TimeRecorder, time recorder to save the connection time.
462        Returns:
463            bool, True if checked profiles are connected, False otherwise.
464        """
465        timer.start_timer(profiles_to_check, force=True)
466        start_time = time.time()
467        while time.time() - start_time < timeout:
468            profiles = self._bluetooth_check_profile_connection()
469            for profile in profiles:
470                if profiles[profile]:
471                    timer.stop_timer(profile)
472            # now check if the specified profile connected.
473            all_connected = True
474            for profile in profiles_to_check:
475                if not profiles[profile]:
476                    all_connected = False
477                    break
478            if all_connected:
479                return True
480            time.sleep(interval)
481        # make sure the profile timer are stopped.
482        timer.stop_timer(profiles_to_check)
483        return False
485    def _bluetooth_check_profile_connection(self):
486        """Return profile connection in a boolean dict.
488        key=<profile name>, val = T/F
489        """
490        profiles = dict()
491        output = self.dut.get_conn_devices()
492        # need to strip all whitespaces.
493        conn_devs = {}
495        for key in output:
496            conn_devs[key.strip()] = output[key].strip()
497        for key in conn_devs:
498            self.logger.info('%s:%s' % (key, conn_devs[key]))
499            if 'XXXXXXXX' in conn_devs[key]:
500                profiles[key] = conn_devs[key]
501            else:
502                profiles[key] = False
503        return profiles
505    @timed_action
506    def wait_for_bluetooth_status_connection_all(
507            self, timeout=DEFAULT_TIMEOUT, interval=DEFAULT_CMD_INTERVAL):
508        """Wait for BT connection by checking if A2DP, HFP and COMP connected.
510        This is used for BT reconnect test.
512        Args:
513            timeout: float, timeout value in second.
514            interval: float, float, interval between polling BT profiles.
515        """
516        ret = False
517        self.measurement_timer.start_timer(DEFAULT_BT_STATUS, force=True)
518        # All profile not connected by default.
519        connected_status = {key: False for key in DEFAULT_BT_STATUS}
520        start_time = time.time()
521        while time.time() < start_time + timeout:
522            try:
523                time.sleep(interval)
524                status = self.dut.get_bt_status()
525                for key in DEFAULT_BT_STATUS:
526                    if (not connected_status[key] and key in status
527                            and 'TRUE' == status[key]):
528                        self.measurement_timer.stop_timer(key)
529                        connected_status[key] = True
530                        self.logger.info(
531                            'BT status %s connected at %fs.' %
532                            (key, self.measurement_timer.elapsed(key)))
533                if False not in connected_status.values():
534                    ret = True
535                    break
536            except DeviceError as ex:
537                self.logger.warning(
538                    'Device exception when waiting for reconnection: %s' % ex)
539        self.measurement_timer.stop_timer(DEFAULT_BT_STATUS)
540        return ret
542    def initiate_ota_via_agsa_verify_transfer_completion_in_logcat(
543            self,
544            agsa_action,
545            dfu_path,
546            destination=None,
547            force=True,
548            apply_image=True,
549            reconnect=True):
550        """
551        Starts an OTA by issuing an intent to AGSA after copying the dfu file to
552        the appropriate location on the phone
554        Args:
555            agsa_action: projects.agsa.lib.test_actions.agsa_acts
556                         .AgsaTestActions
557            dfu_path: string - absolute path of dfu file
558            destination: string - absolute path of file on phone if not
559                         specified will use
560                         /storage/emulated/0/Android/data/com.google.android
561                         .googlequicksearchbox/files/download_cache/apollo.dfu
562            force: value set in the intent sent to AGSA
563            True if success False otherwise
564        """
565        try:
566            agsa_action.initiate_agsa_and_wait_until_transfer(
567                dfu_path, destination=destination, force=force)
568            if apply_image:
569                # set in case
570                self.dut.set_in_case(reconnect=reconnect)
571        except AgsaOTAError as ex:
572            self.logger.error('Failed to OTA via AGSA %s' % ex)
573            return False
574        except DeviceError as ex:
575            self.logger.error('Failed to bring up device %s' % ex)
576            return False
577        return True
579    @timed_action
580    def wait_for_bluetooth_a2dp_hfp_rfcomm_connect(
581            self, address, timeout=DEFAULT_TIMEOUT,
582            interval=DEFAULT_CMD_INTERVAL):
583        """Wait for BT reconnection by checking if A2DP, HFP and COMP connected
584        to the specified address.
586        This is used for BT connection switch test.
588        Args:
589            address: str, MAC of the address to connect.
590            timeout: float, timeout value in second.
591            interval: float, float, interval between polling BT profiles.
593        Returns:
594            True if the specified address is connected. False otherwise.
595        """
596        last_4_hex = address.replace(':', '')[-4:].lower()
597        profiles_to_check = ['HFP Pri', 'A2DP Pri', 'CTRL', 'AUDIO']
598        self.measurement_timer.start_timer(profiles_to_check, force=True)
599        end_time = time.time() + timeout
600        all_connected = True
601        while time.time() < end_time:
602            all_connected = True
603            profiles = self._bluetooth_check_profile_connection()
604            for profile in profiles_to_check:
605                if (profile in profiles and profiles[profile]
606                        and last_4_hex in profiles[profile].lower()):
607                    self.measurement_timer.stop_timer(profile)
608                else:
609                    all_connected = False
610            if all_connected:
611                break
612            time.sleep(interval)
613        # make sure the profile timer are stopped.
614        self.measurement_timer.stop_timer(profiles_to_check)
616        return all_connected