1#!/usr/bin/env python3.5
2#
3#   Copyright 2021 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import time
18import re
19import statistics
20from datetime import datetime
21from acts import utils
22from acts import signals
23from acts.base_test import BaseTestClass
24from acts_contrib.test_utils.gnss.testtracker_util import log_testtracker_uuid
25from acts_contrib.test_utils.tel.tel_logging_utils import start_adb_tcpdump
26from acts_contrib.test_utils.tel.tel_logging_utils import stop_adb_tcpdump
27from acts_contrib.test_utils.tel.tel_logging_utils import get_tcpdump_log
28from acts_contrib.test_utils.tel.tel_test_utils import toggle_airplane_mode
29from acts_contrib.test_utils.gnss import gnss_test_utils as gutils
30
31CONCURRENCY_TYPE = {
32    "gnss": "GNSS location received",
33    "gnss_meas": "GNSS measurement received",
34    "ap_location": "reportLocation"
35}
36
37GPS_XML_CONFIG = {
38    "CS": {
39        'IgnorePosition': 'true', 'IgnoreEph': 'true',
40        'IgnoreTime': 'true', 'AsstIgnoreLto': 'true',
41        'IgnoreJniTime': 'true',
42    },
43    "WS": {
44        'IgnorePosition': 'true', 'AsstIgnoreLto': 'true',
45        'IgnoreJniTime': 'true',
46    },
47    "HS": {}
48}
49
50ONCHIP_CONFIG = {
51    "enable": {"EnableOnChipStopNotification": "1"},
52    "disable": {"EnableOnChipStopNotification": "2"},
53}
54
55
56class GnssConcurrencyTest(BaseTestClass):
57    """ GNSS Concurrency TTFF Tests. """
58
59    def setup_class(self):
60        super().setup_class()
61        self.ad = self.android_devices[0]
62        req_params = [
63            "standalone_cs_criteria", "chre_tolerate_rate", "qdsp6m_path",
64            "outlier_criteria", "max_outliers", "pixel_lab_location",
65            "max_interval", "onchip_interval", "ttff_test_cycle"
66        ]
67        self.unpack_userparams(req_param_names=req_params)
68        gutils._init_device(self.ad)
69        self.ad.adb.shell("setprop persist.vendor.radio.adb_log_on 0")
70        self.ad.adb.shell("sync")
71
72    def setup_test(self):
73        gutils.log_current_epoch_time(self.ad, "test_start_time")
74        log_testtracker_uuid(self.ad, self.current_test_name)
75        gutils.clear_logd_gnss_qxdm_log(self.ad)
76        gutils.start_pixel_logger(self.ad)
77        start_adb_tcpdump(self.ad)
78        # related properties
79        gutils.check_location_service(self.ad)
80        gutils.get_baseband_and_gms_version(self.ad)
81        self.load_chre_nanoapp()
82
83    def teardown_test(self):
84        gutils.stop_pixel_logger(self.ad)
85        stop_adb_tcpdump(self.ad)
86        gutils.log_current_epoch_time(self.ad, "test_end_time")
87
88    def on_fail(self, test_name, begin_time):
89        self.ad.take_bug_report(test_name, begin_time)
90        gutils.get_gnss_qxdm_log(self.ad, self.qdsp6m_path)
91        get_tcpdump_log(self.ad, test_name, begin_time)
92
93    def is_brcm_test(self):
94        """ Check the test is for BRCM and skip if not. """
95        if gutils.check_chipset_vendor_by_qualcomm(self.ad):
96            raise signals.TestSkip("Not BRCM chipset. Skip the test.")
97
98    def load_chre_nanoapp(self):
99        """ Load CHRE nanoapp to target Android Device. """
100        for _ in range(0, 3):
101            try:
102                self.ad.log.info("Start to load the nanoapp")
103                cmd = "chre_power_test_client load"
104                if gutils.is_device_wearable(self.ad):
105                    extra_cmd = "tcm /vendor/etc/chre/power_test_tcm.so"
106                    cmd = " ".join([cmd, extra_cmd])
107                res = self.ad.adb.shell(cmd)
108                if "result 1" in res:
109                    self.ad.log.info("Nano app loaded successfully")
110                    break
111            except Exception as e:
112                self.ad.log.warning("Nano app loaded fail: %s" % e)
113                gutils.reboot(self.ad)
114        else:
115            raise signals.TestError("Failed to load CHRE nanoapp")
116
117    def enable_chre(self, interval_sec):
118        """ Enable or disable gnss concurrency via nanoapp.
119
120        Args:
121            interval_sec: an int for frequency, set 0 as disable.
122        """
123        if interval_sec == 0:
124            self.ad.log.info(f"Stop CHRE request")
125        else:
126            self.ad.log.info(
127                f"Initiate CHRE with {interval_sec} seconds interval")
128        interval_msec = interval_sec * 1000
129        cmd = "chre_power_test_client"
130        option = "enable %d" % interval_msec if interval_msec != 0 else "disable"
131
132        for type in CONCURRENCY_TYPE.keys():
133            if "ap" not in type:
134                self.ad.adb.shell(" ".join([cmd, type, option]))
135
136    def parse_concurrency_result(self,
137                                 begin_time,
138                                 request_type,
139                                 criteria,
140                                 exam_lower=True):
141        """ Parse the test result with given time and criteria.
142
143        Args:
144            begin_time: test begin time.
145            request_type: str for location request type.
146            criteria: dictionary for test criteria.
147            exam_lower: a boolean to identify the lower bond or not.
148        Return: List for the failure and outlier loops and results.
149        """
150        results = []
151        failures = []
152        outliers = []
153        upper_bound = criteria * (
154            1 + self.chre_tolerate_rate) + self.outlier_criteria
155        lower_bound = criteria * (
156            1 - self.chre_tolerate_rate) - self.outlier_criteria
157        search_results = self.ad.search_logcat(CONCURRENCY_TYPE[request_type],
158                                               begin_time)
159        if not search_results:
160            raise signals.TestFailure(f"No log entry found for keyword:"
161                                      f"{CONCURRENCY_TYPE[request_type]}")
162
163        for i in range(len(search_results) - 1):
164            target = search_results[i + 1]
165            timedelt = target["datetime_obj"] - search_results[i]["datetime_obj"]
166            timedelt_sec = timedelt.total_seconds()
167            results.append(timedelt_sec)
168            res_tag = ""
169            if timedelt_sec > upper_bound:
170                failures.append(timedelt_sec)
171                res_tag = "Failure"
172            elif timedelt_sec < lower_bound and exam_lower:
173                failures.append(timedelt_sec)
174                res_tag = "Failure"
175            elif timedelt_sec > criteria * (1 + self.chre_tolerate_rate):
176                outliers.append(timedelt_sec)
177                res_tag = "Outlier"
178            if res_tag:
179                self.ad.log.error(
180                    f"[{res_tag}][{target['time_stamp']}]:{timedelt_sec:.2f} sec"
181                )
182
183        res_summary = " ".join([str(res) for res in results[1:]])
184        self.ad.log.info(f"[{request_type}]Overall Result: {res_summary}")
185        log_prefix = f"TestResult {request_type}"
186        self.ad.log.info(f"{log_prefix}_samples {len(search_results)}")
187        self.ad.log.info(f"{log_prefix}_outliers {len(outliers)}")
188        self.ad.log.info(f"{log_prefix}_failures {len(failures)}")
189        self.ad.log.info(f"{log_prefix}_max_time {max(results):.2f}")
190
191        return outliers, failures, results
192
193    def run_gnss_concurrency_test(self, criteria, test_duration):
194        """ Execute GNSS concurrency test steps.
195
196        Args:
197            criteria: int for test criteria.
198            test_duration: int for test duration.
199        """
200        self.enable_chre(criteria["gnss"])
201        TTFF_criteria = criteria["ap_location"] + self.standalone_cs_criteria
202        gutils.process_gnss_by_gtw_gpstool(
203            self.ad, TTFF_criteria, freq=criteria["ap_location"])
204        self.ad.log.info("Tracking 10 sec to prevent flakiness.")
205        time.sleep(10)
206        begin_time = datetime.now()
207        self.ad.log.info(f"Test Start at {begin_time}")
208        time.sleep(test_duration)
209        self.enable_chre(0)
210        gutils.start_gnss_by_gtw_gpstool(self.ad, False)
211        self.validate_location_test_result(begin_time, criteria)
212
213    def run_chre_only_test(self, criteria, test_duration):
214        """ Execute CHRE only test steps.
215
216        Args:
217            criteria: int for test criteria.
218            test_duration: int for test duration.
219        """
220        begin_time = datetime.now()
221        self.ad.log.info(f"Test Start at {begin_time}")
222        self.enable_chre(criteria["gnss"])
223        time.sleep(test_duration)
224        self.enable_chre(0)
225        self.validate_location_test_result(begin_time, criteria)
226
227    def validate_location_test_result(self, begin_time, request):
228        """ Validate GNSS concurrency/CHRE test results.
229
230        Args:
231            begin_time: epoc of test begin time
232            request: int for test criteria.
233        """
234        results = {}
235        outliers = {}
236        failures = {}
237        failure_log = ""
238        for request_type, criteria in request.items():
239            criteria = criteria if criteria > 1 else 1
240            self.ad.log.info("Starting process %s result" % request_type)
241            outliers[request_type], failures[request_type], results[
242                request_type] = self.parse_concurrency_result(
243                    begin_time, request_type, criteria, exam_lower=False)
244            if not results[request_type]:
245                failure_log += "[%s] Fail to find location report.\n" % request_type
246            if len(failures[request_type]) > 0:
247                failure_log += "[%s] Test exceeds criteria(%.2f): %.2f\n" % (
248                    request_type, criteria, max(failures[request_type]))
249            if len(outliers[request_type]) > self.max_outliers:
250                failure_log += "[%s] Outliers excceds max amount: %d\n" % (
251                    request_type, len(outliers[request_type]))
252
253        if failure_log:
254            failure_log += f"The test begins at {begin_time}\n"
255            raise signals.TestFailure(failure_log)
256
257    def run_engine_switching_test(self, freq):
258        """ Conduct engine switching test with given frequency.
259
260        Args:
261            freq: a list identify source1/2 frequency [freq1, freq2]
262        """
263        request = {"ap_location": self.max_interval}
264        begin_time = datetime.now()
265        self.ad.droid.startLocating(freq[0] * 1000, 0)
266        time.sleep(10)
267        for i in range(5):
268            gutils.start_gnss_by_gtw_gpstool(self.ad, True, freq=freq[1])
269            time.sleep(10)
270            gutils.start_gnss_by_gtw_gpstool(self.ad, False)
271        self.ad.droid.stopLocating()
272        self.calculate_position_error(begin_time)
273        self.validate_location_test_result(begin_time, request)
274
275    def calculate_position_error(self, begin_time):
276        """ Calculate the position error for the logcat search results.
277
278        Args:
279            begin_time: test begin time
280        """
281        position_errors = []
282        search_results = self.ad.search_logcat("reportLocation", begin_time)
283        for result in search_results:
284            # search for location like 25.000717,121.455163
285            regex = r"(-?\d{1,5}\.\d{1,10}),\s*(-?\d{1,5}\.\d{1,10})"
286            result = re.search(regex, result["log_message"])
287            if not result:
288                raise ValueError("lat/lon does not found. "
289                                 f"original text: {result['log_message']}")
290            lat = float(result.group(1))
291            lon = float(result.group(2))
292            pe = gutils.calculate_position_error(lat, lon,
293                                                 self.pixel_lab_location)
294            position_errors.append(pe)
295        self.ad.log.info("TestResult max_position_error %.2f" %
296                         max(position_errors))
297
298    def get_chre_ttff(self, interval_sec, duration):
299        """ Get the TTFF for the first CHRE report.
300
301        Args:
302            interval_sec: test interval in seconds for CHRE.
303            duration: test duration.
304        """
305        begin_time = datetime.now()
306        self.ad.log.info(f"Test start at {begin_time}")
307        self.enable_chre(interval_sec)
308        time.sleep(duration)
309        self.enable_chre(0)
310        for type, pattern in CONCURRENCY_TYPE.items():
311            if type == "ap_location":
312                continue
313            search_results = self.ad.search_logcat(pattern, begin_time)
314            if not search_results:
315                raise signals.TestFailure(
316                    f"Unable to receive {type} report in {duration} seconds")
317            else:
318                ttff_stamp = search_results[0]["datetime_obj"]
319                self.ad.log.info(search_results[0]["time_stamp"])
320                ttff = (ttff_stamp - begin_time).total_seconds()
321                self.ad.log.info(f"CHRE {type} TTFF = {ttff}")
322
323    def add_ttff_conf(self, conf_type):
324        """ Add mcu ttff config to gps.xml
325
326        Args:
327            conf_type: a string identify the config type
328        """
329        gutils.bcm_gps_xml_update_option(
330            self.ad, child_tag="gll", items_to_update=GPS_XML_CONFIG[conf_type])
331
332    def update_gps_conf(self, update_attrib):
333        """ Update gps.xml content
334
335        Args:
336            search_line: target content
337            update_line: update content
338        """
339        gutils.bcm_gps_xml_update_option(
340            self.ad, child_tag="gll", items_to_update=update_attrib)
341
342    def delete_gps_conf(self, conf_type):
343        """ Delete gps.xml content
344
345        Args:
346            conf_type: a string identify the config type
347        """
348        gutils.bcm_gps_xml_update_option(
349            self.ad, child_tag="gll", items_to_delete=GPS_XML_CONFIG[conf_type].keys())
350
351    def preset_mcu_test(self, mode):
352        """ Preseting mcu test with config and device state
353
354        mode:
355            mode: a string identify the test type
356        """
357        self.add_ttff_conf(mode)
358        gutils.push_lhd_overlay(self.ad)
359        toggle_airplane_mode(self.ad.log, self.ad, new_state=True)
360        self.update_gps_conf(ONCHIP_CONFIG["enable"])
361        gutils.clear_aiding_data_by_gtw_gpstool(self.ad)
362        self.ad.reboot(self.ad)
363        self.load_chre_nanoapp()
364
365    def reset_mcu_test(self, mode):
366        """ Resetting mcu test with config and device state
367
368        mode:
369            mode: a string identify the test type
370        """
371        self.delete_gps_conf(mode)
372        self.update_gps_conf(ONCHIP_CONFIG["disable"])
373
374    def get_mcu_ttff(self):
375        """ Get mcu ttff seconds
376
377        Return:
378            ttff: a float identify ttff seconds
379        """
380        search_res = ""
381        search_pattern = "$PGLOR,0,FIX"
382        ttff_regex = r"FIX,(.*)\*"
383        cmd_base = "chre_power_test_client gnss tcm"
384        cmd_start = " ".join([cmd_base, "enable 1000"])
385        cmd_stop = " ".join([cmd_base, "disable"])
386        begin_time = datetime.now()
387
388        self.ad.log.info("Send CHRE enable to DUT")
389        self.ad.adb.shell(cmd_start)
390        for i in range(6):
391            search_res = self.ad.search_logcat(search_pattern, begin_time)
392            if search_res:
393                break
394            time.sleep(10)
395        else:
396            self.ad.adb.shell(cmd_stop)
397            self.ad.log.error("Unable to get mcu ttff in 60 seconds")
398            return 60
399        self.ad.adb.shell(cmd_stop)
400
401        res = re.search(ttff_regex, search_res[0]["log_message"])
402        ttff = res.group(1)
403        self.ad.log.info(f"TTFF = {ttff}")
404        return float(ttff)
405
406    def run_mcu_ttff_loops(self, mode, loops):
407        """ Run mcu ttff with given mode and loops
408
409        Args:
410            mode: a string identify mode cs/ws/hs.
411            loops: a int to identify the number of loops
412        """
413        ttff_res = []
414        for i in range(10):
415            ttff = self.get_mcu_ttff()
416            self.ad.log.info(f"{mode} TTFF LOOP{i+1} = {ttff}")
417            ttff_res.append(ttff)
418            time.sleep(10)
419        self.ad.log.info(f"TestResult {mode}_MAX_TTFF {max(ttff_res)}")
420        self.ad.log.info(
421            f"TestResult {mode}_AVG_TTFF {statistics.mean(ttff_res)}")
422
423    # Concurrency Test Cases
424    def test_gnss_concurrency_location_1_chre_1(self):
425        test_duration = 15
426        criteria = {"ap_location": 1, "gnss": 1, "gnss_meas": 1}
427        self.run_gnss_concurrency_test(criteria, test_duration)
428
429    def test_gnss_concurrency_location_1_chre_8(self):
430        test_duration = 30
431        criteria = {"ap_location": 1, "gnss": 8, "gnss_meas": 8}
432        self.run_gnss_concurrency_test(criteria, test_duration)
433
434    def test_gnss_concurrency_location_15_chre_8(self):
435        test_duration = 60
436        criteria = {"ap_location": 15, "gnss": 8, "gnss_meas": 8}
437        self.run_gnss_concurrency_test(criteria, test_duration)
438
439    def test_gnss_concurrency_location_61_chre_1(self):
440        test_duration = 120
441        criteria = {"ap_location": 61, "gnss": 1, "gnss_meas": 1}
442        self.run_gnss_concurrency_test(criteria, test_duration)
443
444    def test_gnss_concurrency_location_61_chre_10(self):
445        test_duration = 120
446        criteria = {"ap_location": 61, "gnss": 10, "gnss_meas": 10}
447        self.run_gnss_concurrency_test(criteria, test_duration)
448
449    # CHRE Only Test Cases
450    def test_gnss_chre_1(self):
451        test_duration = 15
452        criteria = {"gnss": 1, "gnss_meas": 1}
453        self.run_chre_only_test(criteria, test_duration)
454
455    def test_gnss_chre_8(self):
456        test_duration = 30
457        criteria = {"gnss": 8, "gnss_meas": 8}
458        self.run_chre_only_test(criteria, test_duration)
459
460    # Interval tests
461    def test_variable_interval_via_chre(self):
462        test_duration = 10
463        intervals = [0.1, 0.5, 1.5]
464        for interval in intervals:
465            self.get_chre_ttff(interval, test_duration)
466
467    def test_variable_interval_via_framework(self):
468        test_duration = 10
469        intervals = [0, 0.5, 1.5]
470        for interval in intervals:
471            begin_time = datetime.now()
472            self.ad.droid.startLocating(interval * 1000, 0)
473            time.sleep(test_duration)
474            self.ad.droid.stopLocating()
475            criteria = interval if interval > 1 else 1
476            self.parse_concurrency_result(begin_time, "ap_location", criteria)
477
478    # Engine switching test
479    def test_gps_engine_switching_host_to_onchip(self):
480        self.is_brcm_test()
481        freq = [1, self.onchip_interval]
482        self.run_engine_switching_test(freq)
483
484    def test_gps_engine_switching_onchip_to_host(self):
485        self.is_brcm_test()
486        freq = [self.onchip_interval, 1]
487        self.run_engine_switching_test(freq)
488
489    def test_mcu_cs_ttff(self):
490        mode = "CS"
491        self.preset_mcu_test(mode)
492        self.run_mcu_ttff_loops(mode, self.ttff_test_cycle)
493        self.reset_mcu_test(mode)
494
495    def test_mcu_ws_ttff(self):
496        mode = "WS"
497        self.preset_mcu_test(mode)
498        self.run_mcu_ttff_loops(mode, self.ttff_test_cycle)
499        self.reset_mcu_test(mode)
500
501    def test_mcu_hs_ttff(self):
502        mode = "HS"
503        self.preset_mcu_test(mode)
504        self.run_mcu_ttff_loops(mode, self.ttff_test_cycle)
505        self.reset_mcu_test(mode)
506