1#!/usr/bin/env python3.5 2# 3# Copyright 2021 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import time 18import re 19import statistics 20from datetime import datetime 21from acts import utils 22from acts import signals 23from acts.base_test import BaseTestClass 24from acts_contrib.test_utils.gnss.testtracker_util import log_testtracker_uuid 25from acts_contrib.test_utils.tel.tel_logging_utils import start_adb_tcpdump 26from acts_contrib.test_utils.tel.tel_logging_utils import stop_adb_tcpdump 27from acts_contrib.test_utils.tel.tel_logging_utils import get_tcpdump_log 28from acts_contrib.test_utils.tel.tel_test_utils import toggle_airplane_mode 29from acts_contrib.test_utils.gnss import gnss_test_utils as gutils 30 31CONCURRENCY_TYPE = { 32 "gnss": "GNSS location received", 33 "gnss_meas": "GNSS measurement received", 34 "ap_location": "reportLocation" 35} 36 37GPS_XML_CONFIG = { 38 "CS": { 39 'IgnorePosition': 'true', 'IgnoreEph': 'true', 40 'IgnoreTime': 'true', 'AsstIgnoreLto': 'true', 41 'IgnoreJniTime': 'true', 42 }, 43 "WS": { 44 'IgnorePosition': 'true', 'AsstIgnoreLto': 'true', 45 'IgnoreJniTime': 'true', 46 }, 47 "HS": {} 48} 49 50ONCHIP_CONFIG = { 51 "enable": {"EnableOnChipStopNotification": "1"}, 52 "disable": {"EnableOnChipStopNotification": "2"}, 53} 54 55 56class GnssConcurrencyTest(BaseTestClass): 57 """ GNSS Concurrency TTFF Tests. """ 58 59 def setup_class(self): 60 super().setup_class() 61 self.ad = self.android_devices[0] 62 req_params = [ 63 "standalone_cs_criteria", "chre_tolerate_rate", "qdsp6m_path", 64 "outlier_criteria", "max_outliers", "pixel_lab_location", 65 "max_interval", "onchip_interval", "ttff_test_cycle" 66 ] 67 self.unpack_userparams(req_param_names=req_params) 68 gutils._init_device(self.ad) 69 self.ad.adb.shell("setprop persist.vendor.radio.adb_log_on 0") 70 self.ad.adb.shell("sync") 71 72 def setup_test(self): 73 gutils.log_current_epoch_time(self.ad, "test_start_time") 74 log_testtracker_uuid(self.ad, self.current_test_name) 75 gutils.clear_logd_gnss_qxdm_log(self.ad) 76 gutils.start_pixel_logger(self.ad) 77 start_adb_tcpdump(self.ad) 78 # related properties 79 gutils.check_location_service(self.ad) 80 gutils.get_baseband_and_gms_version(self.ad) 81 self.load_chre_nanoapp() 82 83 def teardown_test(self): 84 gutils.stop_pixel_logger(self.ad) 85 stop_adb_tcpdump(self.ad) 86 gutils.log_current_epoch_time(self.ad, "test_end_time") 87 88 def on_fail(self, test_name, begin_time): 89 self.ad.take_bug_report(test_name, begin_time) 90 gutils.get_gnss_qxdm_log(self.ad, self.qdsp6m_path) 91 get_tcpdump_log(self.ad, test_name, begin_time) 92 93 def is_brcm_test(self): 94 """ Check the test is for BRCM and skip if not. """ 95 if gutils.check_chipset_vendor_by_qualcomm(self.ad): 96 raise signals.TestSkip("Not BRCM chipset. Skip the test.") 97 98 def load_chre_nanoapp(self): 99 """ Load CHRE nanoapp to target Android Device. """ 100 for _ in range(0, 3): 101 try: 102 self.ad.log.info("Start to load the nanoapp") 103 cmd = "chre_power_test_client load" 104 if gutils.is_device_wearable(self.ad): 105 extra_cmd = "tcm /vendor/etc/chre/power_test_tcm.so" 106 cmd = " ".join([cmd, extra_cmd]) 107 res = self.ad.adb.shell(cmd) 108 if "result 1" in res: 109 self.ad.log.info("Nano app loaded successfully") 110 break 111 except Exception as e: 112 self.ad.log.warning("Nano app loaded fail: %s" % e) 113 gutils.reboot(self.ad) 114 else: 115 raise signals.TestError("Failed to load CHRE nanoapp") 116 117 def enable_chre(self, interval_sec): 118 """ Enable or disable gnss concurrency via nanoapp. 119 120 Args: 121 interval_sec: an int for frequency, set 0 as disable. 122 """ 123 if interval_sec == 0: 124 self.ad.log.info(f"Stop CHRE request") 125 else: 126 self.ad.log.info( 127 f"Initiate CHRE with {interval_sec} seconds interval") 128 interval_msec = interval_sec * 1000 129 cmd = "chre_power_test_client" 130 option = "enable %d" % interval_msec if interval_msec != 0 else "disable" 131 132 for type in CONCURRENCY_TYPE.keys(): 133 if "ap" not in type: 134 self.ad.adb.shell(" ".join([cmd, type, option])) 135 136 def parse_concurrency_result(self, 137 begin_time, 138 request_type, 139 criteria, 140 exam_lower=True): 141 """ Parse the test result with given time and criteria. 142 143 Args: 144 begin_time: test begin time. 145 request_type: str for location request type. 146 criteria: dictionary for test criteria. 147 exam_lower: a boolean to identify the lower bond or not. 148 Return: List for the failure and outlier loops and results. 149 """ 150 results = [] 151 failures = [] 152 outliers = [] 153 upper_bound = criteria * ( 154 1 + self.chre_tolerate_rate) + self.outlier_criteria 155 lower_bound = criteria * ( 156 1 - self.chre_tolerate_rate) - self.outlier_criteria 157 search_results = self.ad.search_logcat(CONCURRENCY_TYPE[request_type], 158 begin_time) 159 if not search_results: 160 raise signals.TestFailure(f"No log entry found for keyword:" 161 f"{CONCURRENCY_TYPE[request_type]}") 162 163 for i in range(len(search_results) - 1): 164 target = search_results[i + 1] 165 timedelt = target["datetime_obj"] - search_results[i]["datetime_obj"] 166 timedelt_sec = timedelt.total_seconds() 167 results.append(timedelt_sec) 168 res_tag = "" 169 if timedelt_sec > upper_bound: 170 failures.append(timedelt_sec) 171 res_tag = "Failure" 172 elif timedelt_sec < lower_bound and exam_lower: 173 failures.append(timedelt_sec) 174 res_tag = "Failure" 175 elif timedelt_sec > criteria * (1 + self.chre_tolerate_rate): 176 outliers.append(timedelt_sec) 177 res_tag = "Outlier" 178 if res_tag: 179 self.ad.log.error( 180 f"[{res_tag}][{target['time_stamp']}]:{timedelt_sec:.2f} sec" 181 ) 182 183 res_summary = " ".join([str(res) for res in results[1:]]) 184 self.ad.log.info(f"[{request_type}]Overall Result: {res_summary}") 185 log_prefix = f"TestResult {request_type}" 186 self.ad.log.info(f"{log_prefix}_samples {len(search_results)}") 187 self.ad.log.info(f"{log_prefix}_outliers {len(outliers)}") 188 self.ad.log.info(f"{log_prefix}_failures {len(failures)}") 189 self.ad.log.info(f"{log_prefix}_max_time {max(results):.2f}") 190 191 return outliers, failures, results 192 193 def run_gnss_concurrency_test(self, criteria, test_duration): 194 """ Execute GNSS concurrency test steps. 195 196 Args: 197 criteria: int for test criteria. 198 test_duration: int for test duration. 199 """ 200 self.enable_chre(criteria["gnss"]) 201 TTFF_criteria = criteria["ap_location"] + self.standalone_cs_criteria 202 gutils.process_gnss_by_gtw_gpstool( 203 self.ad, TTFF_criteria, freq=criteria["ap_location"]) 204 self.ad.log.info("Tracking 10 sec to prevent flakiness.") 205 time.sleep(10) 206 begin_time = datetime.now() 207 self.ad.log.info(f"Test Start at {begin_time}") 208 time.sleep(test_duration) 209 self.enable_chre(0) 210 gutils.start_gnss_by_gtw_gpstool(self.ad, False) 211 self.validate_location_test_result(begin_time, criteria) 212 213 def run_chre_only_test(self, criteria, test_duration): 214 """ Execute CHRE only test steps. 215 216 Args: 217 criteria: int for test criteria. 218 test_duration: int for test duration. 219 """ 220 begin_time = datetime.now() 221 self.ad.log.info(f"Test Start at {begin_time}") 222 self.enable_chre(criteria["gnss"]) 223 time.sleep(test_duration) 224 self.enable_chre(0) 225 self.validate_location_test_result(begin_time, criteria) 226 227 def validate_location_test_result(self, begin_time, request): 228 """ Validate GNSS concurrency/CHRE test results. 229 230 Args: 231 begin_time: epoc of test begin time 232 request: int for test criteria. 233 """ 234 results = {} 235 outliers = {} 236 failures = {} 237 failure_log = "" 238 for request_type, criteria in request.items(): 239 criteria = criteria if criteria > 1 else 1 240 self.ad.log.info("Starting process %s result" % request_type) 241 outliers[request_type], failures[request_type], results[ 242 request_type] = self.parse_concurrency_result( 243 begin_time, request_type, criteria, exam_lower=False) 244 if not results[request_type]: 245 failure_log += "[%s] Fail to find location report.\n" % request_type 246 if len(failures[request_type]) > 0: 247 failure_log += "[%s] Test exceeds criteria(%.2f): %.2f\n" % ( 248 request_type, criteria, max(failures[request_type])) 249 if len(outliers[request_type]) > self.max_outliers: 250 failure_log += "[%s] Outliers excceds max amount: %d\n" % ( 251 request_type, len(outliers[request_type])) 252 253 if failure_log: 254 failure_log += f"The test begins at {begin_time}\n" 255 raise signals.TestFailure(failure_log) 256 257 def run_engine_switching_test(self, freq): 258 """ Conduct engine switching test with given frequency. 259 260 Args: 261 freq: a list identify source1/2 frequency [freq1, freq2] 262 """ 263 request = {"ap_location": self.max_interval} 264 begin_time = datetime.now() 265 self.ad.droid.startLocating(freq[0] * 1000, 0) 266 time.sleep(10) 267 for i in range(5): 268 gutils.start_gnss_by_gtw_gpstool(self.ad, True, freq=freq[1]) 269 time.sleep(10) 270 gutils.start_gnss_by_gtw_gpstool(self.ad, False) 271 self.ad.droid.stopLocating() 272 self.calculate_position_error(begin_time) 273 self.validate_location_test_result(begin_time, request) 274 275 def calculate_position_error(self, begin_time): 276 """ Calculate the position error for the logcat search results. 277 278 Args: 279 begin_time: test begin time 280 """ 281 position_errors = [] 282 search_results = self.ad.search_logcat("reportLocation", begin_time) 283 for result in search_results: 284 # search for location like 25.000717,121.455163 285 regex = r"(-?\d{1,5}\.\d{1,10}),\s*(-?\d{1,5}\.\d{1,10})" 286 result = re.search(regex, result["log_message"]) 287 if not result: 288 raise ValueError("lat/lon does not found. " 289 f"original text: {result['log_message']}") 290 lat = float(result.group(1)) 291 lon = float(result.group(2)) 292 pe = gutils.calculate_position_error(lat, lon, 293 self.pixel_lab_location) 294 position_errors.append(pe) 295 self.ad.log.info("TestResult max_position_error %.2f" % 296 max(position_errors)) 297 298 def get_chre_ttff(self, interval_sec, duration): 299 """ Get the TTFF for the first CHRE report. 300 301 Args: 302 interval_sec: test interval in seconds for CHRE. 303 duration: test duration. 304 """ 305 begin_time = datetime.now() 306 self.ad.log.info(f"Test start at {begin_time}") 307 self.enable_chre(interval_sec) 308 time.sleep(duration) 309 self.enable_chre(0) 310 for type, pattern in CONCURRENCY_TYPE.items(): 311 if type == "ap_location": 312 continue 313 search_results = self.ad.search_logcat(pattern, begin_time) 314 if not search_results: 315 raise signals.TestFailure( 316 f"Unable to receive {type} report in {duration} seconds") 317 else: 318 ttff_stamp = search_results[0]["datetime_obj"] 319 self.ad.log.info(search_results[0]["time_stamp"]) 320 ttff = (ttff_stamp - begin_time).total_seconds() 321 self.ad.log.info(f"CHRE {type} TTFF = {ttff}") 322 323 def add_ttff_conf(self, conf_type): 324 """ Add mcu ttff config to gps.xml 325 326 Args: 327 conf_type: a string identify the config type 328 """ 329 gutils.bcm_gps_xml_update_option( 330 self.ad, child_tag="gll", items_to_update=GPS_XML_CONFIG[conf_type]) 331 332 def update_gps_conf(self, update_attrib): 333 """ Update gps.xml content 334 335 Args: 336 search_line: target content 337 update_line: update content 338 """ 339 gutils.bcm_gps_xml_update_option( 340 self.ad, child_tag="gll", items_to_update=update_attrib) 341 342 def delete_gps_conf(self, conf_type): 343 """ Delete gps.xml content 344 345 Args: 346 conf_type: a string identify the config type 347 """ 348 gutils.bcm_gps_xml_update_option( 349 self.ad, child_tag="gll", items_to_delete=GPS_XML_CONFIG[conf_type].keys()) 350 351 def preset_mcu_test(self, mode): 352 """ Preseting mcu test with config and device state 353 354 mode: 355 mode: a string identify the test type 356 """ 357 self.add_ttff_conf(mode) 358 gutils.push_lhd_overlay(self.ad) 359 toggle_airplane_mode(self.ad.log, self.ad, new_state=True) 360 self.update_gps_conf(ONCHIP_CONFIG["enable"]) 361 gutils.clear_aiding_data_by_gtw_gpstool(self.ad) 362 self.ad.reboot(self.ad) 363 self.load_chre_nanoapp() 364 365 def reset_mcu_test(self, mode): 366 """ Resetting mcu test with config and device state 367 368 mode: 369 mode: a string identify the test type 370 """ 371 self.delete_gps_conf(mode) 372 self.update_gps_conf(ONCHIP_CONFIG["disable"]) 373 374 def get_mcu_ttff(self): 375 """ Get mcu ttff seconds 376 377 Return: 378 ttff: a float identify ttff seconds 379 """ 380 search_res = "" 381 search_pattern = "$PGLOR,0,FIX" 382 ttff_regex = r"FIX,(.*)\*" 383 cmd_base = "chre_power_test_client gnss tcm" 384 cmd_start = " ".join([cmd_base, "enable 1000"]) 385 cmd_stop = " ".join([cmd_base, "disable"]) 386 begin_time = datetime.now() 387 388 self.ad.log.info("Send CHRE enable to DUT") 389 self.ad.adb.shell(cmd_start) 390 for i in range(6): 391 search_res = self.ad.search_logcat(search_pattern, begin_time) 392 if search_res: 393 break 394 time.sleep(10) 395 else: 396 self.ad.adb.shell(cmd_stop) 397 self.ad.log.error("Unable to get mcu ttff in 60 seconds") 398 return 60 399 self.ad.adb.shell(cmd_stop) 400 401 res = re.search(ttff_regex, search_res[0]["log_message"]) 402 ttff = res.group(1) 403 self.ad.log.info(f"TTFF = {ttff}") 404 return float(ttff) 405 406 def run_mcu_ttff_loops(self, mode, loops): 407 """ Run mcu ttff with given mode and loops 408 409 Args: 410 mode: a string identify mode cs/ws/hs. 411 loops: a int to identify the number of loops 412 """ 413 ttff_res = [] 414 for i in range(10): 415 ttff = self.get_mcu_ttff() 416 self.ad.log.info(f"{mode} TTFF LOOP{i+1} = {ttff}") 417 ttff_res.append(ttff) 418 time.sleep(10) 419 self.ad.log.info(f"TestResult {mode}_MAX_TTFF {max(ttff_res)}") 420 self.ad.log.info( 421 f"TestResult {mode}_AVG_TTFF {statistics.mean(ttff_res)}") 422 423 # Concurrency Test Cases 424 def test_gnss_concurrency_location_1_chre_1(self): 425 test_duration = 15 426 criteria = {"ap_location": 1, "gnss": 1, "gnss_meas": 1} 427 self.run_gnss_concurrency_test(criteria, test_duration) 428 429 def test_gnss_concurrency_location_1_chre_8(self): 430 test_duration = 30 431 criteria = {"ap_location": 1, "gnss": 8, "gnss_meas": 8} 432 self.run_gnss_concurrency_test(criteria, test_duration) 433 434 def test_gnss_concurrency_location_15_chre_8(self): 435 test_duration = 60 436 criteria = {"ap_location": 15, "gnss": 8, "gnss_meas": 8} 437 self.run_gnss_concurrency_test(criteria, test_duration) 438 439 def test_gnss_concurrency_location_61_chre_1(self): 440 test_duration = 120 441 criteria = {"ap_location": 61, "gnss": 1, "gnss_meas": 1} 442 self.run_gnss_concurrency_test(criteria, test_duration) 443 444 def test_gnss_concurrency_location_61_chre_10(self): 445 test_duration = 120 446 criteria = {"ap_location": 61, "gnss": 10, "gnss_meas": 10} 447 self.run_gnss_concurrency_test(criteria, test_duration) 448 449 # CHRE Only Test Cases 450 def test_gnss_chre_1(self): 451 test_duration = 15 452 criteria = {"gnss": 1, "gnss_meas": 1} 453 self.run_chre_only_test(criteria, test_duration) 454 455 def test_gnss_chre_8(self): 456 test_duration = 30 457 criteria = {"gnss": 8, "gnss_meas": 8} 458 self.run_chre_only_test(criteria, test_duration) 459 460 # Interval tests 461 def test_variable_interval_via_chre(self): 462 test_duration = 10 463 intervals = [0.1, 0.5, 1.5] 464 for interval in intervals: 465 self.get_chre_ttff(interval, test_duration) 466 467 def test_variable_interval_via_framework(self): 468 test_duration = 10 469 intervals = [0, 0.5, 1.5] 470 for interval in intervals: 471 begin_time = datetime.now() 472 self.ad.droid.startLocating(interval * 1000, 0) 473 time.sleep(test_duration) 474 self.ad.droid.stopLocating() 475 criteria = interval if interval > 1 else 1 476 self.parse_concurrency_result(begin_time, "ap_location", criteria) 477 478 # Engine switching test 479 def test_gps_engine_switching_host_to_onchip(self): 480 self.is_brcm_test() 481 freq = [1, self.onchip_interval] 482 self.run_engine_switching_test(freq) 483 484 def test_gps_engine_switching_onchip_to_host(self): 485 self.is_brcm_test() 486 freq = [self.onchip_interval, 1] 487 self.run_engine_switching_test(freq) 488 489 def test_mcu_cs_ttff(self): 490 mode = "CS" 491 self.preset_mcu_test(mode) 492 self.run_mcu_ttff_loops(mode, self.ttff_test_cycle) 493 self.reset_mcu_test(mode) 494 495 def test_mcu_ws_ttff(self): 496 mode = "WS" 497 self.preset_mcu_test(mode) 498 self.run_mcu_ttff_loops(mode, self.ttff_test_cycle) 499 self.reset_mcu_test(mode) 500 501 def test_mcu_hs_ttff(self): 502 mode = "HS" 503 self.preset_mcu_test(mode) 504 self.run_mcu_ttff_loops(mode, self.ttff_test_cycle) 505 self.reset_mcu_test(mode) 506