1#!/usr/bin/env python3
3#   Copyright 2020 - The Android Open Source Project
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
9#       http://www.apache.org/licenses/LICENSE-2.0
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16'''GNSS Base Class for Lab TTFF/FFPE'''
18import os
19import time
20import errno
21import re
22from collections import namedtuple
23from pandas import DataFrame, merge
24from acts_contrib.test_utils.gnss.gnss_defines import DEVICE_GPSLOG_FOLDER
25from acts_contrib.test_utils.gnss.gnss_defines import GPS_PKG_NAME
26from acts_contrib.test_utils.gnss.gnss_defines import BCM_GPS_XML_PATH
27from acts_contrib.test_utils.gnss import dut_log_test_utils as diaglog
28from acts_contrib.test_utils.gnss import gnss_test_utils as gutils
29from acts_contrib.test_utils.gnss import gnss_testlog_utils as glogutils
30from acts import utils
31from acts import signals
32from acts.controllers.gnss_lib import GnssSimulator
33from acts.context import get_current_context
34from acts.base_test import BaseTestClass
37def glob_re(dut, directory, regex_tag):
38    """glob with regular expression method.
39    Args:
40        dut: An AndroidDevice object.
41        directory: Target directory path.
42           Type, str
43        regex_tag: regular expression format string.
44           Type, str
45    Return:
46        result_ls: list of glob result
47    """
48    all_files_in_dir = os.listdir(directory)
49    dut.log.debug(f'glob_re dir: {all_files_in_dir}')
50    target_log_name_regx = re.compile(regex_tag)
51    tmp_ls = list(filter(target_log_name_regx.match, all_files_in_dir))
52    result_ls = [os.path.join(directory, file) for file in tmp_ls]
53    dut.log.debug(f'glob_re list: {result_ls}')
54    return result_ls
57class LabTtffTestBase(BaseTestClass):
58    """ LAB TTFF Tests Base Class"""
59    GTW_GPSTOOL_APP = 'gtw_gpstool_apk'
60    GNSS_SIMULATOR_KEY = 'gnss_sim_params'
61    CUSTOM_FILES_KEY = 'custom_files'
62    CSTTFF_CRITERIA = 'cs_criteria'
63    HSTTFF_CRITERIA = 'hs_criteria'
64    WSTTFF_CRITERIA = 'ws_criteria'
65    CSTTFF_PECRITERIA = 'cs_ttff_pecriteria'
66    HSTTFF_PECRITERIA = 'hs_ttff_pecriteria'
67    WSTTFF_PECRITERIA = 'ws_ttff_pecriteria'
68    TTFF_ITERATION = 'ttff_iteration'
69    SIMULATOR_LOCATION = 'simulator_location'
70    DIAG_OPTION = 'diag_option'
71    SCENARIO_POWER = 'scenario_power'
72    MDSAPP = 'mdsapp'
73    MASKFILE = 'maskfile'
74    MODEMPARFILE = 'modemparfile'
75    NV_DICT = 'nv_dict'
76    TTFF_TIMEOUT = 'ttff_timeout'
78    def __init__(self, controllers):
79        """ Initializes class attributes. """
81        super().__init__(controllers)
82        self.dut = None
83        self.gnss_simulator = None
84        self.rockbottom_script = None
85        self.gnss_log_path = self.log_path
86        self.gps_xml_bk_path = BCM_GPS_XML_PATH + '.bk'
87        self.gpstool_ver = ''
88        self.test_params = None
89        self.custom_files = None
90        self.maskfile = None
91        self.mdsapp = None
92        self.modemparfile = None
93        self.nv_dict = None
94        self.scenario_power = None
95        self.ttff_timeout = None
96        self.test_types = None
97        self.simulator_location = None
98        self.gnss_simulator_scenario = None
99        self.gnss_simulator_power_level = None
101    def setup_class(self):
102        super().setup_class()
104        # Update parameters by test case configurations.
105        test_param = self.TAG + '_params'
106        self.test_params = self.user_params.get(test_param, {})
107        if not self.test_params:
108            self.log.warning(test_param + ' was not found in the user '
109                             'parameters defined in the config file.')
111        # Override user_param values with test parameters
112        self.user_params.update(self.test_params)
114        # Unpack user_params with default values. All the usages of user_params
115        # as self attributes need to be included either as a required parameter
116        # or as a parameter with a default value.
118        # Required parameters
119        req_params = [
122            self.TTFF_ITERATION, self.GNSS_SIMULATOR_KEY, self.DIAG_OPTION,
123            self.GTW_GPSTOOL_APP
124        ]
125        self.unpack_userparams(req_param_names=req_params)
127        # Optional parameters
128        self.custom_files = self.user_params.get(self.CUSTOM_FILES_KEY,[])
129        self.maskfile = self.user_params.get(self.MASKFILE,'')
130        self.mdsapp = self.user_params.get(self.MDSAPP,'')
131        self.modemparfile = self.user_params.get(self.MODEMPARFILE,'')
132        self.nv_dict = self.user_params.get(self.NV_DICT,{})
133        self.scenario_power = self.user_params.get(self.SCENARIO_POWER, [])
134        self.ttff_timeout = self.user_params.get(self.TTFF_TIMEOUT, 60)
136        # Set TTFF Spec.
137        test_type = namedtuple('Type', ['command', 'criteria'])
138        self.test_types = {
139            'cs': test_type('Cold Start', self.cs_criteria),
140            'ws': test_type('Warm Start', self.ws_criteria),
141            'hs': test_type('Hot Start', self.hs_criteria)
142        }
144        self.dut = self.android_devices[0]
146        # GNSS Simulator Setup
147        self.simulator_location = self.gnss_sim_params.get(
148            self.SIMULATOR_LOCATION, [])
149        self.gnss_simulator_scenario = self.gnss_sim_params.get('scenario')
150        self.gnss_simulator_power_level = self.gnss_sim_params.get(
151            'power_level')
153        # Create gnss_simulator instance
154        gnss_simulator_key = self.gnss_sim_params.get('type')
155        gnss_simulator_ip = self.gnss_sim_params.get('ip')
156        gnss_simulator_port = self.gnss_sim_params.get('port')
157        if gnss_simulator_key == 'gss7000':
158            gnss_simulator_port_ctrl = self.gnss_sim_params.get('port_ctrl')
159        else:
160            gnss_simulator_port_ctrl = None
161        self.gnss_simulator = GnssSimulator.AbstractGnssSimulator(
162            gnss_simulator_key, gnss_simulator_ip, gnss_simulator_port,
163            gnss_simulator_port_ctrl)
165        # Unpack the rockbottom script file if its available.
166        if self.custom_files:
167            for file in self.custom_files:
168                if 'rockbottom_' + self.dut.model in file:
169                    self.rockbottom_script = file
170                    break
172    def setup_test(self):
174        self.clear_gps_log()
175        self.gnss_simulator.stop_scenario()
176        self.gnss_simulator.close()
177        if self.rockbottom_script:
178            self.log.info(
179                f'Running rockbottom script for this device {self.dut.model}')
180            self.dut_rockbottom()
181        else:
182            self.log.info(
183                f'Not running rockbottom for this device {self.dut.model}')
185        utils.set_location_service(self.dut, True)
186        gutils.reinstall_package_apk(self.dut, GPS_PKG_NAME,
187                                     self.gtw_gpstool_apk)
188        gpstool_ver_cmd = f'dumpsys package {GPS_PKG_NAME} | grep versionName'
189        self.gpstool_ver = self.dut.adb.shell(gpstool_ver_cmd).split('=')[1]
190        self.log.info(f'GTW GPSTool version: {self.gpstool_ver}')
192        # For BCM DUTs, delete gldata.sto and set IgnoreRomAlm="true" based on b/196936791#comment20
193        if self.diag_option == "BCM":
194            gutils.remount_device(self.dut)
195            # Backup gps.xml
196            if self.dut.file_exists(BCM_GPS_XML_PATH):
197                copy_cmd = f'cp {BCM_GPS_XML_PATH} {self.gps_xml_bk_path}'
198            elif self.dut.file_exists(self.gps_xml_bk_path):
199                self.log.debug(f'{BCM_GPS_XML_PATH} is missing')
200                self.log.debug(
201                    f'Copy {self.gps_xml_bk_path} and rename to {BCM_GPS_XML_PATH}'
202                )
203                copy_cmd = f'cp {self.gps_xml_bk_path} {BCM_GPS_XML_PATH}'
204            else:
205                self.log.error(
206                    f'Missing both {BCM_GPS_XML_PATH} and {self.gps_xml_bk_path} in DUT'
207                )
208                raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT),
209                                        self.gps_xml_bk_path)
210            self.dut.adb.shell(copy_cmd)
211            gutils.delete_bcm_nvmem_sto_file(self.dut)
212            gutils.bcm_gps_ignore_rom_alm(self.dut)
213            if self.current_test_name == "test_tracking_power_sweep":
214                gutils.bcm_gps_ignore_warmstandby(self.dut)
215            # Reboot DUT to apply the setting
216            gutils.reboot(self.dut)
217        self.gnss_simulator.connect()
219    def dut_rockbottom(self):
220        """
221        Set the dut to rockbottom state
223        """
224        # The rockbottom script might include a device reboot, so it is
225        # necessary to stop SL4A during its execution.
226        self.dut.stop_services()
227        self.log.info(f'Executing rockbottom script for {self.dut.model}')
228        os.chmod(self.rockbottom_script, 0o777)
229        os.system(f'{self.rockbottom_script} {self.dut.serial}')
230        # Make sure the DUT is in root mode after coming back
231        self.dut.root_adb()
232        # Restart SL4A
233        self.dut.start_services()
235    def teardown_test(self):
236        """Teardown settings for the test class"""
237        super().teardown_test()
238        # Restore the gps.xml everytime after the test.
239        if self.diag_option == "BCM":
240            # Restore gps.xml
241            gutils.remount_device(self.dut)
242            rm_cmd = f'rm -rf {BCM_GPS_XML_PATH}'
243            restore_cmd = f'cp {self.gps_xml_bk_path} {BCM_GPS_XML_PATH}'
244            self.dut.adb.shell(rm_cmd)
245            self.dut.adb.shell(restore_cmd)
247    def teardown_class(self):
248        """ Executed after completing all selected test cases."""
249        self.clear_gps_log()
250        if self.gnss_simulator:
251            self.gnss_simulator.stop_scenario()
252            self.gnss_simulator.close()
254    def start_set_gnss_power(self):
255        """
256        Start GNSS simulator secnario and set power level.
258        """
260        self.gnss_simulator.start_scenario(self.gnss_simulator_scenario)
261        time.sleep(25)
262        if self.scenario_power:
263            self.log.info(
264                'Set GNSS simulator power with power_level by scenario_power')
265            for setting in self.scenario_power:
266                power_level = setting.get('power_level', -130)
267                sat_system = setting.get('sat_system', '')
268                freq_band = setting.get('freq_band', 'ALL')
269                sat_id = setting.get('sat_id', '')
270                self.log.debug(f'sat: {sat_system}; freq_band: {freq_band}, '
271                               f'power_level: {power_level}, sat_id: {sat_id}')
272                self.gnss_simulator.set_scenario_power(power_level,
273                                                       sat_id,
274                                                       sat_system,
275                                                       freq_band)
276        else:
277            self.log.debug('Set GNSS simulator power '
278                           f'with power_level: {self.gnss_simulator_power_level}')
279            self.gnss_simulator.set_power(self.gnss_simulator_power_level)
281    def get_and_verify_ttff(self, mode):
282        """Retrieve ttff with designate mode.
284            Args:
285                mode: A string for identify gnss test mode.
286        """
287        if mode not in self.test_types:
288            raise signals.TestError(f'Unrecognized mode {mode}')
289        test_type = self.test_types.get(mode)
291        gutils.process_gnss_by_gtw_gpstool(self.dut,
292                                           self.test_types['cs'].criteria)
293        begin_time = gutils.get_current_epoch_time()
294        gutils.start_ttff_by_gtw_gpstool(self.dut,
295                                         ttff_mode=mode,
296                                         iteration=self.ttff_iteration,
297                                         raninterval=True,
298                                         hot_warm_sleep=3,
299                                         timeout=self.ttff_timeout)
300        # Since Wear takes little longer to update the TTFF info.
301        # Workround to solve the wearable timing issue
302        if gutils.is_device_wearable(self.dut):
303            time.sleep(20)
304        ttff_data = gutils.process_ttff_by_gtw_gpstool(self.dut, begin_time,
305                                                       self.simulator_location)
307        # Create folder for GTW GPStool's log
308        gps_log_path = os.path.join(self.gnss_log_path, 'GPSLogs')
309        os.makedirs(gps_log_path, exist_ok=True)
311        self.dut.adb.pull(f'{DEVICE_GPSLOG_FOLDER} {gps_log_path}')
312        local_log_dir = os.path.join(gps_log_path, 'files')
313        gps_api_log = glob_re(self.dut, local_log_dir, r'GNSS_\d+')
314        ttff_loop_log = glob_re(self.dut, local_log_dir,
315                                fr'\w+_{mode.upper()}_\d+')
317        if not gps_api_log and ttff_loop_log:
318            raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT),
319                                    gps_log_path)
321        df_ttff_ffpe = DataFrame(glogutils.parse_gpstool_ttfflog_to_df(gps_api_log[0]))
323        ttff_dict = {}
324        for i in ttff_data:
325            data = ttff_data[i]._asdict()
326            ttff_dict[i] = dict(data)
328        ttff_data_df = DataFrame(ttff_dict).transpose()
329        ttff_data_df = ttff_data_df[[
330            'ttff_loop', 'ttff_sec', 'ttff_pe', 'ttff_haccu'
331        ]]
332        try:
333            df_ttff_ffpe = merge(df_ttff_ffpe, ttff_data_df, left_on='loop', right_on='ttff_loop')
334        except: # pylint: disable=bare-except
335            self.log.warning("Can't merge ttff_data and df.")
336        ttff_data_df.to_json(gps_log_path + '/gps_log_ttff_data.json',
337                             orient='table',
338                             index=False)
339        df_ttff_ffpe.to_json(gps_log_path + '/gps_log.json', orient='table', index=False)
340        result = gutils.check_ttff_data(self.dut,
341                                        ttff_data,
342                                        ttff_mode=test_type.command,
343                                        criteria=test_type.criteria)
344        if not result:
345            raise signals.TestFailure(
346                f'{test_type.command} TTFF fails to reach '
347                'designated criteria')
348        return ttff_data
350    def verify_pe(self, mode):
351        """
352        Verify ttff Position Error with designate mode.
354        Args:
355             mode: A string for identify gnss test mode.
356        """
358        ffpe_type = namedtuple('Type', ['command', 'pecriteria'])
359        ffpe_types = {
360            'cs': ffpe_type('Cold Start', self.cs_ttff_pecriteria),
361            'ws': ffpe_type('Warm Start', self.ws_ttff_pecriteria),
362            'hs': ffpe_type('Hot Start', self.hs_ttff_pecriteria)
363        }
365        if mode not in ffpe_types:
366            raise signals.TestError(f'Unrecognized mode {mode}')
367        test_type = ffpe_types.get(mode)
369        ttff_data = self.get_and_verify_ttff(mode)
370        result = gutils.check_ttff_pe(self.dut,
371                                      ttff_data,
372                                      ttff_mode=test_type.command,
373                                      pe_criteria=test_type.pecriteria)
374        if not result:
375            raise signals.TestFailure(
376                f'{test_type.command} TTFF fails to reach '
377                'designated criteria')
378        return ttff_data
380    def clear_gps_log(self):
381        """
382        Delete the existing GPS GTW Log from DUT.
384        """
385        self.dut.adb.shell(f'rm -rf {DEVICE_GPSLOG_FOLDER}')
387    def start_dut_gnss_log(self):
388        """Start GNSS chip log according to different diag_option"""
389        # Start GNSS chip log
390        if self.diag_option == "QCOM":
391            diaglog.start_diagmdlog_background(self.dut, maskfile=self.maskfile)
392        else:
393            gutils.start_pixel_logger(self.dut)
395    def stop_and_pull_dut_gnss_log(self, gnss_vendor_log_path=None):
396        """
397        Stop DUT GNSS logger and pull log into local PC dir
398            Arg:
399                gnss_vendor_log_path: gnss log path directory.
400                    Type, str.
401                    Default, None
402        """
403        if not gnss_vendor_log_path:
404            gnss_vendor_log_path = self.gnss_log_path
405        if self.diag_option == "QCOM":
406            diaglog.stop_background_diagmdlog(self.dut,
407                                              gnss_vendor_log_path,
408                                              keep_logs=False)
409        else:
410            gutils.stop_pixel_logger(self.dut)
411            self.log.info('Getting Pixel BCM Log!')
412            diaglog.get_pixellogger_bcm_log(self.dut,
413                                            gnss_vendor_log_path,
414                                            keep_logs=False)
416    def start_gnss_and_wait(self, wait=60):
417        """
418        The process of enable gnss and spend the wait time for GNSS to
419        gather enoung information that make sure the stability of testing.
421        Args:
422            wait: wait time between power sweep.
423                Type, int.
424                Default, 60.
425        """
426        # Create log path for waiting section logs of GPStool.
427        gnss_wait_log_dir = os.path.join(self.gnss_log_path, 'GNSS_wait')
429        # Enable GNSS to receive satellites' signals for "wait_between_pwr" seconds.
430        self.log.info('Enable GNSS for searching satellites')
431        gutils.start_gnss_by_gtw_gpstool(self.dut, state=True)
432        self.log.info(f'Wait for {wait} seconds')
433        time.sleep(wait)
435        # Stop GNSS and pull the logs.
436        gutils.start_gnss_by_gtw_gpstool(self.dut, state=False)
437        diaglog.get_gpstool_logs(self.dut, gnss_wait_log_dir, False)
439    def exe_eecoexer_loop_cmd(self, cmd_list=None):
440        """
441        Function for execute EECoexer command list
442            Args:
443                cmd_list: a list of EECoexer function command.
444                Type, list.
445        """
446        if cmd_list:
447            for cmd in cmd_list:
448                self.log.info('Execute EEcoexer Command: {}'.format(cmd))
449                gutils.execute_eecoexer_function(self.dut, cmd)
451    def gnss_ttff_ffpe(self,
452                       mode,
453                       sub_context_path='',
454                       coex_cmd='',
455                       stop_coex_cmd=''):
456        """
457        Base ttff and ffpe function
458            Args:
459                mode: Set the TTFF mode for testing. Definitions are as below.
460                      cs(cold start), ws(warm start), hs(hot start)
461                sub_context_path: Set specifc log pathfor ttff_ffpe
462        """
463        # Create log file path
464        full_output_path = get_current_context().get_full_output_path()
465        self.gnss_log_path = os.path.join(full_output_path, sub_context_path)
466        os.makedirs(self.gnss_log_path, exist_ok=True)
467        self.log.debug(f'Create log path: {self.gnss_log_path}')
469        # Start and set GNSS simulator
470        self.start_set_gnss_power()
472        # Start GNSS chip log
473        self.start_dut_gnss_log()
475        # Wait for acquiring almanac
476        if mode != 'cs':
477            wait_time = 900
478        else:
479            wait_time = 3
480        self.start_gnss_and_wait(wait=wait_time)
482        # Start Coex if available
483        if coex_cmd and stop_coex_cmd:
484            self.exe_eecoexer_loop_cmd(coex_cmd)
486        # Start verifying TTFF and FFPE
487        self.verify_pe(mode)
489        # Set gnss_vendor_log_path based on GNSS solution vendor
490        gnss_vendor_log_path = os.path.join(self.gnss_log_path,
491                                            self.diag_option)
492        os.makedirs(gnss_vendor_log_path, exist_ok=True)
494        # Stop GNSS chip log and pull the logs to local file system
495        self.stop_and_pull_dut_gnss_log(gnss_vendor_log_path)
497        # Stop Coex if available
498        if coex_cmd and stop_coex_cmd:
499            self.exe_eecoexer_loop_cmd(stop_coex_cmd)