1#!/usr/bin/env python3
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import datetime
18import re
19
20from acts.controllers.adb_lib.error import AdbError
21from acts.controllers.buds_lib.test_actions.base_test_actions import BaseTestAction
22from acts.controllers.buds_lib.test_actions.base_test_actions import timed_action
23
24PHONE_DFU_PATH = ('/storage/emulated/0/Android/data/com.google.android'
25                  '.googlequicksearchbox/files/download_cache/apollo.dfu')
26
27AGSA_BROADCAST = (
28    'am  broadcast -a \'action_ota\' --es dfu_url %s --es build_label 9.9.9 '
29    '--ez is_force %s com.google.android.googlequicksearchbox/com.google'
30    '.android.apps.gsa.broadcastreceiver.CommonBroadcastReceiver')
31
32
33class AgsaOTAError(Exception):
34    """OTA Error"""
35
36
37class AgsaTestActions(BaseTestAction):
38    """AGSA test action library."""
39
40    def __init__(self, android_dev, logger=None):
41        """
42        Simple init code to keep the android object for future reference.
43        Args:
44           android_dev: devcontrollers.android_device.AndroidDevice
45        """
46        super(AgsaTestActions, self).__init__(logger)
47        self.dut = android_dev
48
49    @timed_action
50    def _initiate_agsa_ota(self, file_path, destination=None, force=True):
51        """Pushes the dfu file to phone and issues broadcast to start AGSA OTA
52
53        Args:
54            file_path: (string) path of dfu file
55            destination: (string) destination path on the phone uses
56                         $PHONE_DFU_PATH if not specified
57            force: (bool) option to force the issued broadcast?
58        """
59        if not destination:
60            destination = PHONE_DFU_PATH
61        if self.dut.push_file_to_phone(file_path, destination):
62            if force:
63                force = 'true'
64            else:
65                force = 'false'
66
67            command = AGSA_BROADCAST % (destination, force)
68            output = self.dut.adb.shell(command.split())
69            if 'result=0' in output:
70                self.logger.info('Agsa broadcast successful!')
71                return True
72            else:
73                self.logger.error('Agsa broadcast failed')
74                return False
75
76    @timed_action
77    def _wait_for_ota_to_finish(self, timeout=660):
78        """Logcat is continuously read to keep track of the OTA transfer
79
80        Args:
81           timeout: (int) time to wait before timing out.
82
83        Returns:
84            True on success
85
86        Raises: AgsaOTAError if the timeout is reached.
87        """
88        # regex that confirms completion
89        transfer_completion_match = \
90            re.compile('OTA progress: 100 %|OTA img rcvd')
91        # time now + $timeout
92        expiry_time = datetime.datetime.now() + \
93                      datetime.timedelta(seconds=timeout)
94        self.logger.info('Waiting for OTA transfer to complete....')
95        while True:
96            # time now - 1 minute: to be used in getting logs from a minute back
97            now_plus_minute = datetime.datetime.now() - \
98                              datetime.timedelta(seconds=5)
99            try:
100                # grep logcat for 'DeviceLog'
101                filtered_log = self.dut.logcat_filter_message(
102                    now_plus_minute.strftime('%m-%d %H:%M:%S.000'),
103                    'Devicelog:')
104                if filtered_log and \
105                        transfer_completion_match.search(filtered_log):
106                    self.logger.info('Transfer completed!')
107                    break
108            except AdbError:
109                # gets thrown if no matching string is found
110                pass
111            if datetime.datetime.now() > expiry_time:
112                self.logger.error('Timed out waiting for OTA to complete.')
113                raise AgsaOTAError('Timed out waiting for OTA to complete.')
114        return True
115
116    @timed_action
117    def initiate_agsa_and_wait_until_transfer(self, file_path, destination=None,
118                                              force=True, timeout=660):
119        """Calls _initiate_agsa_ota and _wait_for_ota_to_finish
120
121        Returns:
122            True on success and False otherwise
123        """
124        self._initiate_agsa_ota(file_path, destination, force)
125        return self._wait_for_ota_to_finish(timeout)
126
127    @timed_action
128    def install_agsa(self, version, force=False):
129        """
130        Installs the specified version of AGSA if different from the one
131        currently installed, unless force is set to True.
132
133        Args:
134            version: (string) ex: '7.14.21.release'
135            force: (bool) installs only if currently installed version is
136                   different than the one to be installed. True installs
137                   by-passing version check
138        Return:
139            True on Success and False otherwise
140        """
141        # get currently installed version, and install agsa only if different
142        # from what is requested
143        current_version = self.dut.get_agsa_version()
144        if (not (version.replace('alpha', '').replace('release', '')
145                 in current_version)) or force:
146            self.logger.info('Current AGSA version is %s' % current_version)
147            self.logger.info('Installing AGSA version %s...' % version)
148            if self.and_actions.install_agsa(version):
149                self.logger.info('Install success!')
150                return True
151            else:
152                self.logger.error('Failed to install version %s' % version)
153                return False
154