1#!/usr/bin/env python3
2#
3#   Copyright 2020 - Google
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16import time
17import re
18import os
19import pathlib
20import math
21import shutil
22import fnmatch
23import posixpath
24import subprocess
25import tempfile
26import functools
27from retry import retry
28from collections import namedtuple
29from datetime import datetime, timedelta
30from xml.etree import ElementTree
31from contextlib import contextmanager
32from statistics import median
33
34from acts import utils
35from acts import asserts
36from acts import signals
37from acts.libs.proc import job
38from acts.controllers.adb_lib.error import AdbCommandError
39from acts.controllers.android_device import list_adb_devices
40from acts.controllers.android_device import list_fastboot_devices
41from acts.controllers.android_device import DEFAULT_QXDM_LOG_PATH
42from acts.controllers.android_device import SL4A_APK_NAME
43from acts_contrib.test_utils.gnss.gnss_measurement import GnssMeasurement
44from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
45from acts_contrib.test_utils.tel import tel_logging_utils as tlutils
46from acts_contrib.test_utils.tel import tel_test_utils as tutils
47from acts_contrib.test_utils.gnss import device_doze
48from acts_contrib.test_utils.gnss import gnssstatus_utils
49from acts_contrib.test_utils.gnss import gnss_constant
50from acts_contrib.test_utils.gnss import supl
51from acts_contrib.test_utils.instrumentation.device.command.instrumentation_command_builder import InstrumentationCommandBuilder
52from acts_contrib.test_utils.instrumentation.device.command.instrumentation_command_builder import InstrumentationTestCommandBuilder
53from acts.utils import get_current_epoch_time
54from acts.utils import epoch_to_human_time
55from acts_contrib.test_utils.gnss.gnss_defines import BCM_GPS_XML_PATH
56from acts_contrib.test_utils.gnss.gnss_defines import BCM_NVME_STO_PATH
57
58WifiEnums = wutils.WifiEnums
59FIRST_FIXED_MAX_WAITING_TIME = 60
60UPLOAD_TO_SPONGE_PREFIX = "TestResult "
61PULL_TIMEOUT = 300
62GNSSSTATUS_LOG_PATH = (
63    "/storage/emulated/0/Android/data/com.android.gpstool/files/")
64QXDM_MASKS = ["GPS.cfg", "GPS-general.cfg", "default.cfg"]
65TTFF_REPORT = namedtuple(
66    "TTFF_REPORT", "utc_time ttff_loop ttff_sec ttff_pe ttff_ant_cn "
67                   "ttff_base_cn ttff_haccu")
68TRACK_REPORT = namedtuple(
69    "TRACK_REPORT", "l5flag pe ant_top4cn ant_cn base_top4cn base_cn device_time report_time")
70LOCAL_PROP_FILE_CONTENTS = """\
71log.tag.LocationManagerService=VERBOSE
72log.tag.GnssLocationProvider=VERBOSE
73log.tag.GnssMeasurementsProvider=VERBOSE
74log.tag.GpsNetInitiatedHandler=VERBOSE
75log.tag.GnssNetInitiatedHandler=VERBOSE
76log.tag.GnssNetworkConnectivityHandler=VERBOSE
77log.tag.ConnectivityService=VERBOSE
78log.tag.ConnectivityManager=VERBOSE
79log.tag.GnssVisibilityControl=VERBOSE
80log.tag.NtpTimeHelper=VERBOSE
81log.tag.NtpTrustedTime=VERBOSE
82log.tag.GnssPsdsDownloader=VERBOSE
83log.tag.Gnss=VERBOSE
84log.tag.GnssConfiguration=VERBOSE"""
85LOCAL_PROP_FILE_CONTENTS_FOR_WEARABLE = """\
86log.tag.ImsPhone=VERBOSE
87log.tag.GsmCdmaPhone=VERBOSE
88log.tag.Phone=VERBOSE
89log.tag.GCoreFlp=VERBOSE"""
90TEST_PACKAGE_NAME = "com.google.android.apps.maps"
91LOCATION_PERMISSIONS = [
92    "android.permission.ACCESS_FINE_LOCATION",
93    "android.permission.ACCESS_COARSE_LOCATION"
94]
95GNSSTOOL_PACKAGE_NAME = "com.android.gpstool"
96GNSSTOOL_PERMISSIONS = [
97    "android.permission.ACCESS_FINE_LOCATION",
98    "android.permission.READ_EXTERNAL_STORAGE",
99    "android.permission.ACCESS_COARSE_LOCATION",
100    "android.permission.CALL_PHONE",
101    "android.permission.WRITE_CONTACTS",
102    "android.permission.CAMERA",
103    "android.permission.WRITE_EXTERNAL_STORAGE",
104    "android.permission.READ_CONTACTS",
105    "android.permission.ACCESS_BACKGROUND_LOCATION"
106]
107DISABLE_LTO_FILE_CONTENTS = """\
108LONGTERM_PSDS_SERVER_1="http://"
109LONGTERM_PSDS_SERVER_2="http://"
110LONGTERM_PSDS_SERVER_3="http://"
111NORMAL_PSDS_SERVER="http://"
112REALTIME_PSDS_SERVER="http://"
113"""
114DISABLE_LTO_FILE_CONTENTS_R = """\
115XTRA_SERVER_1="http://"
116XTRA_SERVER_2="http://"
117XTRA_SERVER_3="http://"
118"""
119_BRCM_DUTY_CYCLE_PATTERN = re.compile(r".*PGLOR,\d+,STA.*")
120_WEARABLE_QCOM_VENDOR_REGEX = re.compile(r"init.svc.qcom")
121_GPS_ELAPSED_REALTIME_DIFF_TOLERANCE = 500_000
122
123class GnssTestUtilsError(Exception):
124    pass
125
126
127def remount_device(ad):
128    """Remount device file system to read and write.
129
130    Args:
131        ad: An AndroidDevice object.
132    """
133    for retries in range(5):
134        ad.root_adb()
135        if ad.adb.getprop("ro.boot.veritymode") == "enforcing":
136            ad.adb.disable_verity()
137            reboot(ad)
138        remount_result = ad.adb.remount()
139        ad.log.info("Attempt %d - %s" % (retries + 1, remount_result))
140        if "remount succeeded" in remount_result:
141            break
142
143
144def reboot(ad):
145    """Reboot device and check if mobile data is available.
146
147    Args:
148        ad: An AndroidDevice object.
149    """
150    ad.log.info("Reboot device to make changes take effect.")
151    # TODO(diegowchung): remove the timeout setting after p23 back to normal
152    ad.reboot(timeout=600)
153    ad.unlock_screen(password=None)
154    if not is_mobile_data_on(ad):
155        set_mobile_data(ad, True)
156    utils.sync_device_time(ad)
157
158
159def enable_gnss_verbose_logging(ad):
160    """Enable GNSS VERBOSE Logging and persistent logcat.
161
162    Args:
163        ad: An AndroidDevice object.
164    """
165    remount_device(ad)
166    ad.log.info("Enable GNSS VERBOSE Logging and persistent logcat.")
167    if check_chipset_vendor_by_qualcomm(ad):
168        ad.adb.shell("echo -e '\nDEBUG_LEVEL = 5' >> /vendor/etc/gps.conf")
169    else:
170        ad.adb.shell("echo LogEnabled=true >> /data/vendor/gps/libgps.conf")
171        ad.adb.shell("chown gps.system /data/vendor/gps/libgps.conf")
172    if is_device_wearable(ad):
173       PROP_CONTENTS = LOCAL_PROP_FILE_CONTENTS + LOCAL_PROP_FILE_CONTENTS_FOR_WEARABLE
174    else:
175        PROP_CONTENTS = LOCAL_PROP_FILE_CONTENTS
176    ad.adb.shell("echo %r >> /data/local.prop" % PROP_CONTENTS)
177    ad.adb.shell("chmod 644 /data/local.prop")
178    ad.adb.shell("setprop persist.logd.logpersistd.size 20000")
179    ad.adb.shell("setprop persist.logd.size 16777216")
180    ad.adb.shell("setprop persist.vendor.radio.adb_log_on 1")
181    ad.adb.shell("setprop persist.logd.logpersistd logcatd")
182    ad.adb.shell("setprop log.tag.copresGcore VERBOSE")
183    ad.adb.shell("sync")
184
185
186def get_am_flags(value):
187    """Returns the (value, type) flags for a given python value."""
188    if type(value) is bool:
189        return str(value).lower(), 'boolean'
190    elif type(value) is str:
191        return value, 'string'
192    raise ValueError("%s should be either 'boolean' or 'string'" % value)
193
194
195def enable_compact_and_particle_fusion_log(ad):
196    """Enable CompactLog, FLP particle fusion log and disable gms
197    location-based quake monitoring.
198
199    Args:
200        ad: An AndroidDevice object.
201    """
202    ad.root_adb()
203    ad.log.info("Enable FLP flags and Disable GMS location-based quake "
204                "monitoring.")
205    overrides = {
206        'compact_log_enabled': True,
207        'flp_use_particle_fusion': True,
208        'flp_particle_fusion_extended_bug_report': True,
209        'flp_event_log_size': '86400',
210        'proks_config': '28',
211        'flp_particle_fusion_bug_report_window_sec': '86400',
212        'flp_particle_fusion_bug_report_max_buffer_size': '86400',
213        'seismic_data_collection': False,
214        'Ealert__enable': False,
215    }
216    for flag, python_value in overrides.items():
217        value, type = get_am_flags(python_value)
218        cmd = ("am broadcast -a com.google.android.gms.phenotype.FLAG_OVERRIDE "
219               "--es package com.google.android.location --es user \* "
220               "--esa flags %s --esa values %s --esa types %s "
221               "com.google.android.gms" % (flag, value, type))
222        ad.adb.shell(cmd, ignore_status=True)
223    ad.adb.shell("am force-stop com.google.android.gms")
224    ad.adb.shell("am broadcast -a com.google.android.gms.INITIALIZE")
225
226
227def disable_xtra_throttle(ad):
228    """Disable XTRA throttle will have no limit to download XTRA data.
229
230    Args:
231        ad: An AndroidDevice object.
232    """
233    remount_device(ad)
234    ad.log.info("Disable XTRA Throttle.")
235    ad.adb.shell("echo -e '\nXTRA_TEST_ENABLED=1' >> /vendor/etc/gps.conf")
236    ad.adb.shell("echo -e '\nXTRA_THROTTLE_ENABLED=0' >> /vendor/etc/gps.conf")
237
238
239def enable_supl_mode(ad):
240    """Enable SUPL back on for next test item.
241
242    Args:
243        ad: An AndroidDevice object.
244    """
245    remount_device(ad)
246    ad.log.info("Enable SUPL mode.")
247    ad.adb.shell("echo -e '\nSUPL_MODE=1' >> /etc/gps_debug.conf")
248
249
250def disable_supl_mode(ad):
251    """Kill SUPL to test XTRA/LTO only test item.
252
253    Args:
254        ad: An AndroidDevice object.
255    """
256    remount_device(ad)
257    ad.log.info("Disable SUPL mode.")
258    ad.adb.shell("echo -e '\nSUPL_MODE=0' >> /etc/gps_debug.conf")
259    if not check_chipset_vendor_by_qualcomm(ad):
260        supl.set_supl_over_wifi_state(ad, False)
261
262
263def enable_vendor_orbit_assistance_data(ad):
264    """Enable vendor assistance features.
265        For Qualcomm: Enable XTRA
266        For Broadcom: Enable LTO
267
268    Args:
269        ad: An AndroidDevice object.
270    """
271    ad.root_adb()
272    if check_chipset_vendor_by_qualcomm(ad):
273        disable_xtra_throttle(ad)
274        reboot(ad)
275    elif is_device_wearable(ad):
276        lto_mode_wearable(ad, True)
277    else:
278        lto_mode(ad, True)
279
280
281def disable_vendor_orbit_assistance_data(ad):
282    """Disable vendor assistance features.
283
284    For Qualcomm: disable XTRA
285    For Broadcom: disable LTO
286
287    Args:
288        ad: An AndroidDevice object.
289    """
290    ad.root_adb()
291    if check_chipset_vendor_by_qualcomm(ad):
292        disable_qualcomm_orbit_assistance_data(ad)
293    elif is_device_wearable(ad):
294        lto_mode_wearable(ad, False)
295    else:
296        lto_mode(ad, False)
297
298def gla_mode(ad, state: bool):
299    """Enable or disable Google Location Accuracy feature.
300
301    Args:
302        ad: An AndroidDevice object.
303        state: True to enable GLA, False to disable GLA.
304    """
305    ad.root_adb()
306    if state:
307        ad.adb.shell('settings put global assisted_gps_enabled 1')
308        ad.log.info("Modify current GLA Mode to MS_BASED mode")
309    else:
310        ad.adb.shell('settings put global assisted_gps_enabled 0')
311        ad.log.info("Modify current GLA Mode to standalone mode")
312
313    out = int(ad.adb.shell("settings get global assisted_gps_enabled"))
314    if out == 1:
315        ad.log.info("GLA is enabled, MS_BASED mode")
316    else:
317        ad.log.info("GLA is disabled, standalone mode")
318
319
320def disable_qualcomm_orbit_assistance_data(ad):
321    """Disable assiatance features for Qualcomm project.
322
323    Args:
324        ad: An AndroidDevice object.
325    """
326    ad.log.info("Disable XTRA-daemon until next reboot.")
327    ad.adb.shell("killall xtra-daemon", ignore_status=True)
328
329
330def disable_private_dns_mode(ad):
331    """Due to b/118365122, it's better to disable private DNS mode while
332       testing. 8.8.8.8 private dns sever is unstable now, sometimes server
333       will not response dns query suddenly.
334
335    Args:
336        ad: An AndroidDevice object.
337    """
338    tutils.get_operator_name(ad.log, ad, subId=None)
339    if ad.adb.shell("settings get global private_dns_mode") != "off":
340        ad.log.info("Disable Private DNS mode.")
341        ad.adb.shell("settings put global private_dns_mode off")
342
343
344def _init_device(ad):
345    """Init GNSS test devices.
346
347    Args:
348        ad: An AndroidDevice object.
349    """
350    check_location_service(ad)
351    enable_gnss_verbose_logging(ad)
352    prepare_gps_overlay(ad)
353    set_screen_always_on(ad)
354    ad.log.info("Setting Bluetooth state to False")
355    ad.droid.bluetoothToggleState(False)
356    set_wifi_and_bt_scanning(ad, True)
357    disable_private_dns_mode(ad)
358    init_gtw_gpstool(ad)
359    if is_device_wearable(ad):
360        disable_battery_defend(ad)
361
362
363def prepare_gps_overlay(ad):
364    """Set pixellogger gps log mask to
365    resolve gps logs unreplayable from brcm vendor
366    """
367    if not check_chipset_vendor_by_qualcomm(ad):
368        overlay_file = "/data/vendor/gps/overlay/gps_overlay.xml"
369        xml_file = generate_gps_overlay_xml(ad)
370        try:
371            ad.log.info("Push gps_overlay to device")
372            ad.adb.push(xml_file, overlay_file)
373            ad.adb.shell(f"chmod 777 {overlay_file}")
374        finally:
375            xml_folder = os.path.abspath(os.path.join(xml_file, os.pardir))
376            shutil.rmtree(xml_folder)
377
378
379def generate_gps_overlay_xml(ad):
380    """For r11 devices, the overlay setting is 'Replayable default'
381    For other brcm devices, the setting is 'Replayable debug'
382
383    Returns:
384        path to the xml file
385    """
386    root_attrib = {
387        "xmlns": "http://www.glpals.com/",
388        "xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance",
389        "xsi:schemaLocation": "http://www.glpals.com/ glconfig.xsd",
390    }
391    sub_attrib = {"EnableOnChipStopNotification": "true"}
392    if not is_device_wearable(ad):
393        sub_attrib["LogPriMask"] = "LOG_DEBUG"
394        sub_attrib["LogFacMask"] = "LOG_GLLIO | LOG_GLLAPI | LOG_NMEA | LOG_RAWDATA"
395        sub_attrib["OnChipLogPriMask"] = "LOG_DEBUG"
396        sub_attrib["OnChipLogFacMask"] = "LOG_GLLIO | LOG_GLLAPI | LOG_NMEA | LOG_RAWDATA"
397
398    temp_path = tempfile.mkdtemp()
399    xml_file = os.path.join(temp_path, "gps_overlay.xml")
400
401    root = ElementTree.Element('glgps')
402    for key, value in root_attrib.items():
403        root.attrib[key] = value
404
405    ad.log.debug("Sub attrib is %s", sub_attrib)
406
407    sub = ElementTree.SubElement(root, 'gll')
408    for key, value in sub_attrib.items():
409        sub.attrib[key] = value
410
411    xml = ElementTree.ElementTree(root)
412    xml.write(xml_file, xml_declaration=True, encoding="utf-8", method="xml")
413    return xml_file
414
415
416def connect_to_wifi_network(ad, network):
417    """Connection logic for open and psk wifi networks.
418
419    Args:
420        ad: An AndroidDevice object.
421        network: Dictionary with network info.
422    """
423    SSID = network[WifiEnums.SSID_KEY]
424    ad.ed.clear_all_events()
425    wutils.reset_wifi(ad)
426    wutils.start_wifi_connection_scan_and_ensure_network_found(ad, SSID)
427    for i in range(5):
428        wutils.wifi_connect(ad, network, check_connectivity=False)
429        # Validates wifi connection with ping_gateway=False to avoid issue like
430        # b/254913994.
431        if wutils.validate_connection(ad, ping_gateway=False):
432            ad.log.info("WiFi connection is validated")
433            return
434    raise signals.TestError("Failed to connect WiFi")
435
436def set_wifi_and_bt_scanning(ad, state=True):
437    """Set Wi-Fi and Bluetooth scanning on/off in Settings -> Location
438
439    Args:
440        ad: An AndroidDevice object.
441        state: True to turn on "Wi-Fi and Bluetooth scanning".
442            False to turn off "Wi-Fi and Bluetooth scanning".
443    """
444    ad.root_adb()
445    if state:
446        ad.adb.shell("settings put global wifi_scan_always_enabled 1")
447        ad.adb.shell("settings put global ble_scan_always_enabled 1")
448        ad.log.info("Wi-Fi and Bluetooth scanning are enabled")
449    else:
450        ad.adb.shell("settings put global wifi_scan_always_enabled 0")
451        ad.adb.shell("settings put global ble_scan_always_enabled 0")
452        ad.log.info("Wi-Fi and Bluetooth scanning are disabled")
453
454
455def check_location_service(ad):
456    """Set location service on.
457       Verify if location service is available.
458
459    Args:
460        ad: An AndroidDevice object.
461    """
462    remount_device(ad)
463    utils.set_location_service(ad, True)
464    ad.adb.shell("cmd location set-location-enabled true")
465    location_mode = int(ad.adb.shell("settings get secure location_mode"))
466    ad.log.info("Current Location Mode >> %d" % location_mode)
467    if location_mode != 3:
468        raise signals.TestError("Failed to turn Location on")
469
470
471def delete_device_folder(ad, folder):
472    ad.log.info("Folder to be deleted: %s" % folder)
473    folder_contents = ad.adb.shell(f"ls {folder}", ignore_status=True)
474    ad.log.debug("Contents to be deleted: %s" % folder_contents)
475    ad.adb.shell("rm -rf %s" % folder, ignore_status=True)
476
477
478def remove_pixel_logger_folder(ad):
479    if check_chipset_vendor_by_qualcomm(ad):
480        folder = "/sdcard/Android/data/com.android.pixellogger/files/logs/diag_logs"
481    else:
482        folder = "/sdcard/Android/data/com.android.pixellogger/files/logs/gps/"
483
484    delete_device_folder(ad, folder)
485
486
487def clear_logd_gnss_qxdm_log(ad):
488    """Clear /data/misc/logd,
489    /storage/emulated/0/Android/data/com.android.gpstool/files and
490    /data/vendor/radio/diag_logs/logs from previous test item then reboot.
491
492    Args:
493        ad: An AndroidDevice object.
494    """
495    remount_device(ad)
496    ad.log.info("Clear Logd, GNSS and PixelLogger Log from previous test item.")
497    folders_should_be_removed = ["/data/misc/logd"]
498    ad.adb.shell(
499        'find %s -name "*.txt" -type f -delete' % GNSSSTATUS_LOG_PATH,
500        ignore_status=True)
501    if check_chipset_vendor_by_qualcomm(ad):
502        output_path = posixpath.join(DEFAULT_QXDM_LOG_PATH, "logs")
503        folders_should_be_removed += [output_path]
504    else:
505        always_on_logger_log_path = ("/data/vendor/gps/logs")
506        folders_should_be_removed += [always_on_logger_log_path]
507    for folder in folders_should_be_removed:
508        delete_device_folder(ad, folder)
509    remove_pixel_logger_folder(ad)
510    if not is_device_wearable(ad):
511        reboot(ad)
512
513
514def get_gnss_qxdm_log(ad, qdb_path=None):
515    """Get /storage/emulated/0/Android/data/com.android.gpstool/files and
516    /data/vendor/radio/diag_logs/logs for test item.
517
518    Args:
519        ad: An AndroidDevice object.
520        qdb_path: The path of qdsp6m.qdb on different projects.
521    """
522    log_path = ad.device_log_path
523    os.makedirs(log_path, exist_ok=True)
524    gnss_log_name = "gnssstatus_log_%s_%s" % (ad.model, ad.serial)
525    gnss_log_path = posixpath.join(log_path, gnss_log_name)
526    os.makedirs(gnss_log_path, exist_ok=True)
527    ad.log.info("Pull GnssStatus Log to %s" % gnss_log_path)
528    ad.adb.pull("%s %s" % (GNSSSTATUS_LOG_PATH + ".", gnss_log_path),
529                timeout=PULL_TIMEOUT, ignore_status=True)
530    shutil.make_archive(gnss_log_path, "zip", gnss_log_path)
531    shutil.rmtree(gnss_log_path, ignore_errors=True)
532    if check_chipset_vendor_by_qualcomm(ad):
533        output_path = (
534            "/sdcard/Android/data/com.android.pixellogger/files/logs/"
535            "diag_logs/.")
536    else:
537        output_path = (
538            "/sdcard/Android/data/com.android.pixellogger/files/logs/gps/.")
539    qxdm_log_name = "PixelLogger_%s_%s" % (ad.model, ad.serial)
540    qxdm_log_path = posixpath.join(log_path, qxdm_log_name)
541    os.makedirs(qxdm_log_path, exist_ok=True)
542    ad.log.info("Pull PixelLogger Log %s to %s" % (output_path,
543                                                   qxdm_log_path))
544    ad.adb.pull("%s %s" % (output_path, qxdm_log_path),
545                timeout=PULL_TIMEOUT, ignore_status=True)
546    if check_chipset_vendor_by_qualcomm(ad):
547        for path in qdb_path:
548            output = ad.adb.pull("%s %s" % (path, qxdm_log_path),
549                                 timeout=PULL_TIMEOUT, ignore_status=True)
550            if "No such file or directory" in output:
551                continue
552            break
553    shutil.make_archive(qxdm_log_path, "zip", qxdm_log_path)
554    shutil.rmtree(qxdm_log_path, ignore_errors=True)
555
556
557def set_mobile_data(ad, state):
558    """Set mobile data on or off and check mobile data state.
559
560    Args:
561        ad: An AndroidDevice object.
562        state: True to enable mobile data. False to disable mobile data.
563    """
564    ad.root_adb()
565    if state:
566        if is_device_wearable(ad):
567            ad.log.info("Enable wearable mobile data.")
568            ad.adb.shell("settings put global cell_on 1")
569        else:
570            ad.log.info("Enable mobile data via RPC call.")
571            ad.droid.telephonyToggleDataConnection(True)
572    else:
573        if is_device_wearable(ad):
574            ad.log.info("Disable wearable mobile data.")
575            ad.adb.shell("settings put global cell_on 0")
576        else:
577            ad.log.info("Disable mobile data via RPC call.")
578            ad.droid.telephonyToggleDataConnection(False)
579    time.sleep(5)
580    ret_val = is_mobile_data_on(ad)
581    if state and ret_val:
582        ad.log.info("Mobile data is enabled and set to %s" % ret_val)
583    elif not state and not ret_val:
584        ad.log.info("Mobile data is disabled and set to %s" % ret_val)
585    else:
586        ad.log.error("Mobile data is at unknown state and set to %s" % ret_val)
587
588
589def gnss_trigger_modem_ssr_by_adb(ad, dwelltime=60):
590    """Trigger modem SSR crash by adb and verify if modem crash and recover
591    successfully.
592
593    Args:
594        ad: An AndroidDevice object.
595        dwelltime: Waiting time for modem reset. Default is 60 seconds.
596
597    Returns:
598        True if success.
599        False if failed.
600    """
601    begin_time = get_current_epoch_time()
602    ad.root_adb()
603    cmds = ("echo restart > /sys/kernel/debug/msm_subsys/modem",
604            r"echo 'at+cfun=1,1\r' > /dev/at_mdm0")
605    for cmd in cmds:
606        ad.log.info("Triggering modem SSR crash by %s" % cmd)
607        output = ad.adb.shell(cmd, ignore_status=True)
608        if "No such file or directory" in output:
609            continue
610        break
611    time.sleep(dwelltime)
612    ad.send_keycode("HOME")
613    logcat_results = ad.search_logcat("SSRObserver", begin_time)
614    if logcat_results:
615        for ssr in logcat_results:
616            if "mSubsystem='modem', mCrashReason" in ssr["log_message"]:
617                ad.log.debug(ssr["log_message"])
618                ad.log.info("Triggering modem SSR crash successfully.")
619                return True
620        raise signals.TestError("Failed to trigger modem SSR crash")
621    raise signals.TestError("No SSRObserver found in logcat")
622
623
624def gnss_trigger_modem_ssr_by_mds(ad, dwelltime=60):
625    """Trigger modem SSR crash by mds tool and verify if modem crash and recover
626    successfully.
627
628    Args:
629        ad: An AndroidDevice object.
630        dwelltime: Waiting time for modem reset. Default is 60 seconds.
631
632    Returns:
633        ssr_crash_time: The epoch time SSR is crashed
634    """
635    mds_check = ad.adb.shell("pm path com.google.mdstest")
636    if not mds_check:
637        raise signals.TestError("MDS Tool is not properly installed.")
638    ad.root_adb()
639    cmd = ('am instrument -w -e request "4b 25 03 00" '
640           '"com.google.mdstest/com.google.mdstest.instrument'
641           '.ModemCommandInstrumentation"')
642    ad.log.info("Triggering modem SSR crash by MDS")
643    ssr_crash_time = get_current_epoch_time()
644    output = ad.adb.shell(cmd, ignore_status=True)
645    ad.log.debug(output)
646    time.sleep(dwelltime)
647    ad.send_keycode("HOME")
648    if "SUCCESS" in output:
649        ad.log.info("Triggering modem SSR crash by MDS successfully.")
650    else:
651        raise signals.TestError(
652            "Failed to trigger modem SSR crash by MDS. \n%s" % output)
653    return ssr_crash_time
654
655
656def check_xtra_download(ad, begin_time):
657    """Verify XTRA download success log message in logcat.
658
659    Args:
660        ad: An AndroidDevice object.
661        begin_time: test begin time
662
663    Returns:
664        True: xtra_download if XTRA downloaded and injected successfully
665        otherwise return False.
666    """
667    ad.send_keycode("HOME")
668    if check_chipset_vendor_by_qualcomm(ad):
669        xtra_results = ad.search_logcat("XTRA download success. "
670                                        "inject data into modem", begin_time)
671        if xtra_results:
672            ad.log.debug("%s" % xtra_results[-1]["log_message"])
673            ad.log.info("XTRA downloaded and injected successfully.")
674            return True
675        ad.log.error("XTRA downloaded FAIL.")
676    else:
677        if is_device_wearable(ad):
678            lto_results = ad.search_logcat("GnssLocationProvider: "
679                                           "calling native_inject_psds_data", begin_time)
680        else:
681            lto_results = ad.search_logcat("GnssPsdsAidl: injectPsdsData: "
682                                           "psdsType: 1", begin_time)
683        if lto_results:
684            ad.log.debug("%s" % lto_results[-1]["log_message"])
685            ad.log.info("LTO downloaded and injected successfully.")
686            return True
687        ad.log.error("LTO downloaded and inject FAIL.")
688    return False
689
690
691def pull_package_apk(ad, package_name):
692    """Pull apk of given package_name from device.
693
694    Args:
695        ad: An AndroidDevice object.
696        package_name: Package name of apk to pull.
697
698    Returns:
699        The temp path of pulled apk.
700    """
701    out = ad.adb.shell("pm path %s" % package_name)
702    result = re.search(r"package:(.*)", out)
703    if not result:
704        raise signals.TestError("Couldn't find apk of %s" % package_name)
705    else:
706        apk_source = result.group(1)
707        ad.log.info("Get apk of %s from %s" % (package_name, apk_source))
708        apk_path = tempfile.mkdtemp()
709        ad.pull_files([apk_source], apk_path)
710    return apk_path
711
712
713def pull_gnss_cfg_file(ad, file):
714    """Pull given gnss cfg file from device.
715
716    Args:
717        ad: An AndroidDevice object.
718        file: CFG file in device to pull.
719
720    Returns:
721        The temp path of pulled gnss cfg file in host.
722    """
723    ad.root_adb()
724    host_dest = tempfile.mkdtemp()
725    ad.pull_files(file, host_dest)
726    for path_key in os.listdir(host_dest):
727        if fnmatch.fnmatch(path_key, "*.cfg"):
728            gnss_cfg_file = os.path.join(host_dest, path_key)
729            break
730    else:
731        raise signals.TestError("No cfg file is found in %s" % host_dest)
732    return gnss_cfg_file
733
734
735def reinstall_package_apk(ad, package_name, apk_path):
736    """Reinstall apk of given package_name.
737
738    Args:
739        ad: An AndroidDevice object.
740        package_name: Package name of apk.
741        apk_path: The temp path of pulled apk.
742    """
743    for path_key in os.listdir(apk_path):
744        if fnmatch.fnmatch(path_key, "*.apk"):
745            apk_path = os.path.join(apk_path, path_key)
746            break
747    else:
748        raise signals.TestError("No apk is found in %s" % apk_path)
749    ad.log.info("Re-install %s with path: %s" % (package_name, apk_path))
750    ad.adb.shell("settings put global verifier_verify_adb_installs 0")
751    ad.adb.install("-r -d -g --user 0 %s" % apk_path)
752    package_check = ad.adb.shell("pm path %s" % package_name)
753    if not package_check:
754        tutils.abort_all_tests(
755            ad.log, "%s is not properly re-installed." % package_name)
756    ad.log.info("%s is re-installed successfully." % package_name)
757
758
759def init_gtw_gpstool(ad):
760    """Init GTW_GPSTool apk.
761
762    Args:
763        ad: An AndroidDevice object.
764    """
765    remount_device(ad)
766    gpstool_path = pull_package_apk(ad, "com.android.gpstool")
767    reinstall_package_apk(ad, "com.android.gpstool", gpstool_path)
768    shutil.rmtree(gpstool_path, ignore_errors=True)
769
770
771def fastboot_factory_reset(ad, state=True):
772    """Factory reset the device in fastboot mode.
773       Pull sl4a apk from device. Terminate all sl4a sessions,
774       Reboot the device to bootloader,
775       factory reset the device by fastboot.
776       Reboot the device. wait for device to complete booting
777       Re-install and start an sl4a session.
778
779    Args:
780        ad: An AndroidDevice object.
781        State: True for exit_setup_wizard, False for not exit_setup_wizard.
782
783    Returns:
784        True if factory reset process complete.
785    """
786    status = True
787    mds_path = ""
788    gnss_cfg_file = ""
789    gnss_cfg_path = "/vendor/etc/mdlog"
790    default_gnss_cfg = "/vendor/etc/mdlog/DEFAULT+SECURITY+FULLDPL+GPS.cfg"
791    sl4a_path = pull_package_apk(ad, SL4A_APK_NAME)
792    gpstool_path = pull_package_apk(ad, "com.android.gpstool")
793    if check_chipset_vendor_by_qualcomm(ad):
794        mds_path = pull_package_apk(ad, "com.google.mdstest")
795        gnss_cfg_file = pull_gnss_cfg_file(ad, default_gnss_cfg)
796    stop_pixel_logger(ad)
797    ad.stop_services()
798    for i in range(1, 4):
799        try:
800            if ad.serial in list_adb_devices():
801                ad.log.info("Reboot to bootloader")
802                ad.adb.reboot("bootloader", ignore_status=True)
803                time.sleep(10)
804            if ad.serial in list_fastboot_devices():
805                ad.log.info("Factory reset in fastboot")
806                ad.fastboot._w(timeout=300, ignore_status=True)
807                time.sleep(30)
808                ad.log.info("Reboot in fastboot")
809                ad.fastboot.reboot()
810            ad.wait_for_boot_completion()
811            ad.root_adb()
812            if ad.skip_sl4a:
813                break
814            if ad.is_sl4a_installed():
815                break
816            if is_device_wearable(ad):
817                ad.log.info("Wait 5 mins for wearable projects system busy time.")
818                time.sleep(300)
819            reinstall_package_apk(ad, SL4A_APK_NAME, sl4a_path)
820            reinstall_package_apk(ad, "com.android.gpstool", gpstool_path)
821            if check_chipset_vendor_by_qualcomm(ad):
822                reinstall_package_apk(ad, "com.google.mdstest", mds_path)
823                ad.push_system_file(gnss_cfg_file, gnss_cfg_path)
824            time.sleep(10)
825            break
826        except Exception as e:
827            ad.log.error(e)
828            if i == attempts:
829                tutils.abort_all_tests(ad.log, str(e))
830            time.sleep(5)
831    try:
832        ad.start_adb_logcat()
833    except Exception as e:
834        ad.log.error(e)
835    if state:
836        ad.exit_setup_wizard()
837    if ad.skip_sl4a:
838        return status
839    tutils.bring_up_sl4a(ad)
840    for path in [sl4a_path, gpstool_path, mds_path, gnss_cfg_file]:
841        shutil.rmtree(path, ignore_errors=True)
842    return status
843
844
845def clear_aiding_data_by_gtw_gpstool(ad):
846    """Launch GTW GPSTool and Clear all GNSS aiding data.
847       Wait 5 seconds for GTW GPStool to clear all GNSS aiding
848       data properly.
849
850    Args:
851        ad: An AndroidDevice object.
852    """
853    if not check_chipset_vendor_by_qualcomm(ad):
854        delete_lto_file(ad)
855    ad.log.info("Launch GTW GPSTool and Clear all GNSS aiding data")
856    ad.adb.shell("am start -S -n com.android.gpstool/.GPSTool --es mode clear")
857    time.sleep(10)
858
859
860def start_gnss_by_gtw_gpstool(ad,
861                              state,
862                              api_type="gnss",
863                              bgdisplay=False,
864                              freq=0,
865                              lowpower=False,
866                              meas=False):
867    """Start or stop GNSS on GTW_GPSTool.
868
869    Args:
870        ad: An AndroidDevice object.
871        state: True to start GNSS. False to Stop GNSS.
872        api_type: Different API for location fix. Use gnss/flp/nmea
873        bgdisplay: true to run GTW when Display off. false to not run GTW when
874          Display off.
875        freq: An integer to set location update frequency.
876        meas: A Boolean to set GNSS measurement registration.
877        lowpower: A boolean to set GNSS LowPowerMode.
878    """
879    cmd = "am start -S -n com.android.gpstool/.GPSTool --es mode gps"
880    if not state:
881        ad.log.info("Stop %s on GTW_GPSTool." % api_type)
882        cmd = "am broadcast -a com.android.gpstool.stop_gps_action"
883    else:
884        options = ("--es type {} --ei freq {} --ez BG {} --ez meas {} --ez "
885                   "lowpower {}").format(api_type, freq, bgdisplay, meas, lowpower)
886        cmd = cmd + " " + options
887    ad.adb.shell(cmd, ignore_status=True, timeout = 300)
888    time.sleep(3)
889
890
891def process_gnss_by_gtw_gpstool(ad,
892                                criteria,
893                                api_type="gnss",
894                                clear_data=True,
895                                meas_flag=False,
896                                freq=0,
897                                bg_display=False):
898    """Launch GTW GPSTool and Clear all GNSS aiding data
899       Start GNSS tracking on GTW_GPSTool.
900
901    Args:
902        ad: An AndroidDevice object.
903        criteria: Criteria for current test item.
904        api_type: Different API for location fix. Use gnss/flp/nmea
905        clear_data: True to clear GNSS aiding data. False is not to. Default
906        set to True.
907        meas_flag: True to enable GnssMeasurement. False is not to. Default
908        set to False.
909        freq: An integer to set location update frequency. Default set to 0.
910        bg_display: To enable GPS tool bg display or not
911
912    Returns:
913        First fix datetime obj
914
915    Raises:
916        signals.TestFailure: when first fixed is over criteria or not even get first fixed
917    """
918    retries = 3
919    for i in range(retries):
920        if not ad.is_adb_logcat_on:
921            ad.start_adb_logcat()
922        check_adblog_functionality(ad)
923        check_location_runtime_permissions(
924            ad, GNSSTOOL_PACKAGE_NAME, GNSSTOOL_PERMISSIONS)
925        begin_time = get_current_epoch_time()
926        if clear_data:
927            clear_aiding_data_by_gtw_gpstool(ad)
928        ad.log.info("Start %s on GTW_GPSTool - attempt %d" % (api_type.upper(),
929                                                              i+1))
930        start_gnss_by_gtw_gpstool(ad, state=True, api_type=api_type, meas=meas_flag, freq=freq,
931                                  bgdisplay=bg_display)
932        for _ in range(10 + criteria):
933            logcat_results = ad.search_logcat("First fixed", begin_time)
934            if logcat_results:
935                ad.log.debug(logcat_results[-1]["log_message"])
936                first_fixed = int(logcat_results[-1]["log_message"].split()[-1])
937                ad.log.info("%s First fixed = %.3f seconds" %
938                            (api_type.upper(), first_fixed/1000))
939                if (first_fixed/1000) <= criteria:
940                    return logcat_results[-1]["datetime_obj"]
941                start_gnss_by_gtw_gpstool(ad, state=False, api_type=api_type)
942                raise signals.TestFailure("Fail to get %s location fixed "
943                                          "within %d seconds criteria."
944                                          % (api_type.upper(), criteria))
945            time.sleep(1)
946        check_current_focus_app(ad)
947        start_gnss_by_gtw_gpstool(ad, state=False, api_type=api_type)
948    raise signals.TestFailure("Fail to get %s location fixed within %d "
949                              "attempts." % (api_type.upper(), retries))
950
951
952def start_ttff_by_gtw_gpstool(ad,
953                              ttff_mode,
954                              iteration,
955                              aid_data=False,
956                              raninterval=False,
957                              mininterval=10,
958                              maxinterval=40,
959                              hot_warm_sleep=300,
960                              timeout=60):
961    """Identify which TTFF mode for different test items.
962
963    Args:
964        ad: An AndroidDevice object.
965        ttff_mode: TTFF Test mode for current test item.
966        iteration: Iteration of TTFF cycles.
967        aid_data: Boolean for identify aid_data existed or not
968        raninterval: Boolean for identify random interval of TTFF in enable or not.
969        mininterval: Minimum value of random interval pool. The unit is second.
970        maxinterval: Maximum value of random interval pool. The unit is second.
971        hot_warm_sleep: Wait time for acquiring Almanac.
972        timeout: TTFF time out. The unit is second.
973    Returns:
974        latest_start_time: (Datetime) the start time of latest successful TTFF
975    """
976    begin_time = get_current_epoch_time()
977    ad.log.debug("[start_ttff] Search logcat start time: %s" % begin_time)
978    if (ttff_mode == "hs" or ttff_mode == "ws") and not aid_data:
979        ad.log.info("Wait {} seconds to start TTFF {}...".format(
980            hot_warm_sleep, ttff_mode.upper()))
981        time.sleep(hot_warm_sleep)
982    if ttff_mode == "cs":
983        ad.log.info("Start TTFF Cold Start...")
984        time.sleep(3)
985    elif ttff_mode == "csa":
986        ad.log.info("Start TTFF CSWith Assist...")
987        time.sleep(3)
988    for i in range(1, 4):
989        try:
990            ad.log.info(f"Before sending TTFF gms version is {get_gms_version(ad)}")
991            ad.adb.shell("am broadcast -a com.android.gpstool.ttff_action "
992                         "--es ttff {} --es cycle {}  --ez raninterval {} "
993                         "--ei mininterval {} --ei maxinterval {}".format(
994                         ttff_mode, iteration, raninterval, mininterval,
995                         maxinterval))
996        except job.TimeoutError:
997            # If this is the last retry and we still get timeout error, raises the timeoutError.
998            if i == 3:
999                raise
1000            # Currently we encounter lots of timeout issue in Qualcomm devices. But so far we don't
1001            # know the root cause yet. In order to continue the test, we ignore the timeout for
1002            # retry.
1003            ad.log.warn("Send TTFF command timeout.")
1004            ad.log.info(f"Current gms version is {get_gms_version(ad)}")
1005            # Wait 2 second to retry
1006            time.sleep(2)
1007            continue
1008        time.sleep(1)
1009        result = ad.search_logcat("act=com.android.gpstool.start_test_action", begin_time)
1010        if result:
1011            ad.log.debug("TTFF start log %s" % result)
1012            latest_start_time = max(list(map(lambda x: x['datetime_obj'], result)))
1013            ad.log.info("Send TTFF start_test_action successfully.")
1014            return latest_start_time
1015    else:
1016        check_current_focus_app(ad)
1017        raise signals.TestError("Fail to send TTFF start_test_action.")
1018
1019
1020def gnss_tracking_via_gtw_gpstool(ad,
1021                                  criteria,
1022                                  api_type="gnss",
1023                                  testtime=60,
1024                                  meas_flag=False,
1025                                  freq=0,
1026                                  is_screen_off=False):
1027    """Start GNSS/FLP tracking tests for input testtime on GTW_GPSTool.
1028
1029    Args:
1030        ad: An AndroidDevice object.
1031        criteria: Criteria for current TTFF.
1032        api_type: Different API for location fix. Use gnss/flp/nmea
1033        testtime: Tracking test time for minutes. Default set to 60 minutes.
1034        meas_flag: True to enable GnssMeasurement. False is not to. Default
1035        set to False.
1036        freq: An integer to set location update frequency. Default set to 0.
1037        is_screen_off: whether to turn off during tracking
1038
1039    Returns:
1040        The datetime obj of first fixed
1041    """
1042    first_fixed_time = process_gnss_by_gtw_gpstool(
1043        ad, criteria=criteria, api_type=api_type, meas_flag=meas_flag, freq=freq,
1044        bg_display=is_screen_off)
1045    ad.log.info("Start %s tracking test for %d minutes" % (api_type.upper(),
1046                                                           testtime))
1047    begin_time = get_current_epoch_time()
1048    with set_screen_status(ad, off=is_screen_off):
1049        wait_n_mins_for_gnss_tracking(ad, begin_time, testtime, api_type)
1050        ad.log.info("Successfully tested for %d minutes" % testtime)
1051    start_gnss_by_gtw_gpstool(ad, state=False, api_type=api_type)
1052
1053    return first_fixed_time
1054
1055
1056def wait_n_mins_for_gnss_tracking(ad, begin_time, testtime, api_type="gnss",
1057                                  ignore_hal_crash=False):
1058    """Waits for GNSS tracking to finish and detect GNSS crash during the waiting time.
1059
1060    Args:
1061        ad: An AndroidDevice object.
1062        begin_time: The start time of tracking.
1063        api_type: Different API for location fix. Use gnss/flp/nmea
1064        testtime: Tracking test time for minutes.
1065        ignore_hal_crash: To ignore HAL crash error no not.
1066    """
1067    while get_current_epoch_time() - begin_time < testtime * 60 * 1000:
1068        detect_crash_during_tracking(ad, begin_time, api_type, ignore_hal_crash)
1069        # add sleep here to avoid too many request and cause device not responding
1070        time.sleep(1)
1071
1072def run_ttff_via_gtw_gpstool(ad, mode, criteria, test_cycle, true_location,
1073                             raninterval: bool = False, mininterval: int = 10,
1074                             maxinterval: int = 40):
1075    """Run GNSS TTFF test with selected mode and parse the results.
1076
1077    Args:
1078        mode: "cs", "ws" or "hs"
1079        criteria: Criteria for the TTFF.
1080
1081    Returns:
1082        ttff_data: A dict of all TTFF data.
1083    """
1084    # Before running TTFF, we will run tracking and try to get first fixed.
1085    # But the TTFF before TTFF doesn't apply to any criteria, so we set a maximum value.
1086    process_gnss_by_gtw_gpstool(ad, criteria=FIRST_FIXED_MAX_WAITING_TIME)
1087    ttff_start_time = start_ttff_by_gtw_gpstool(ad,
1088                                                mode,
1089                                                iteration=test_cycle,
1090                                                raninterval=raninterval,
1091                                                mininterval=mininterval,
1092                                                maxinterval=maxinterval)
1093    ttff_data = process_ttff_by_gtw_gpstool(ad, ttff_start_time, true_location)
1094    result = check_ttff_data(ad, ttff_data, gnss_constant.TTFF_MODE.get(mode), criteria)
1095    asserts.assert_true(
1096        result, "TTFF %s fails to reach designated criteria: %d "
1097                "seconds." % (gnss_constant.TTFF_MODE.get(mode), criteria))
1098    return ttff_data
1099
1100def parse_gtw_gpstool_log(ad, true_position, api_type="gnss", validate_gnssstatus=False):
1101    """Process GNSS/FLP API logs from GTW GPSTool and output track_data to
1102    test_run_info for ACTS plugin to parse and display on MobileHarness as
1103    Property.
1104
1105    Args:
1106        ad: An AndroidDevice object.
1107        true_position: Coordinate as [latitude, longitude] to calculate
1108        position error.
1109        api_type: Different API for location fix. Use gnss/flp/nmea
1110        validate_gnssstatus: Validate gnssstatus or not
1111
1112    Returns:
1113        A dict of location reported from GPSTool
1114            {<utc_time>: TRACK_REPORT, ...}
1115    """
1116    gnssstatus_count = 0
1117    test_logfile = {}
1118    track_data = {}
1119    ant_top4_cn = 0
1120    ant_cn = 0
1121    base_top4_cn = 0
1122    base_cn = 0
1123    track_lat = 0
1124    track_long = 0
1125    l5flag = "false"
1126    gps_datetime_pattern = re.compile("(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}\.\d{0,5})")
1127    gps_datetime_format = "%Y/%m/%d %H:%M:%S.%f"
1128    file_count = int(ad.adb.shell("find %s -type f -iname *.txt | wc -l"
1129                                  % GNSSSTATUS_LOG_PATH))
1130    if file_count != 1:
1131        ad.log.warn("%d API logs exist." % file_count)
1132    dir_file = ad.adb.shell("ls -tr %s" % GNSSSTATUS_LOG_PATH).split()
1133    for path_key in dir_file:
1134        if fnmatch.fnmatch(path_key, "*.txt"):
1135            logpath = posixpath.join(GNSSSTATUS_LOG_PATH, path_key)
1136            out = ad.adb.shell("wc -c %s" % logpath)
1137            file_size = int(out.split(" ")[0])
1138            if file_size < 2000:
1139                ad.log.info("Skip log %s due to log size %d bytes" %
1140                            (path_key, file_size))
1141                continue
1142            test_logfile = logpath
1143    if not test_logfile:
1144        raise signals.TestError("Failed to get test log file in device.")
1145    lines = ad.adb.shell("cat %s" % test_logfile).split("\n")
1146    gnss_svid_container = gnssstatus_utils.GnssSvidContainer()
1147    for line in lines:
1148        if line.startswith('Fix'):
1149            try:
1150                gnss_status = gnssstatus_utils.GnssStatus(line)
1151                gnssstatus_count += 1
1152            except gnssstatus_utils.RegexParseException as e:
1153                ad.log.warn(e)
1154                continue
1155
1156            gnss_svid_container.add_satellite(gnss_status)
1157            if validate_gnssstatus:
1158                gnss_status.validate_gnssstatus()
1159
1160        if "Antenna_History Avg Top4" in line:
1161            ant_top4_cn = float(line.split(":")[-1].strip())
1162        elif "Antenna_History Avg" in line:
1163            ant_cn = float(line.split(":")[-1].strip())
1164        elif "Baseband_History Avg Top4" in line:
1165            base_top4_cn = float(line.split(":")[-1].strip())
1166        elif "Baseband_History Avg" in line:
1167            base_cn = float(line.split(":")[-1].strip())
1168        elif "L5 used in fix" in line:
1169            l5flag = line.split(":")[-1].strip()
1170        elif "Latitude" in line:
1171            track_lat = float(line.split(":")[-1].strip())
1172        elif "Longitude" in line:
1173            track_long = float(line.split(":")[-1].strip())
1174        elif "Read:" in line:
1175            target = re.search(gps_datetime_pattern, line)
1176            device_time = datetime.strptime(target.group(1), gps_datetime_format)
1177        elif "Time" in line:
1178            target = re.search(gps_datetime_pattern, line)
1179            track_utc = target.group(1)
1180            report_time = datetime.strptime(track_utc, gps_datetime_format)
1181            if track_utc in track_data.keys():
1182                continue
1183            pe = calculate_position_error(track_lat, track_long, true_position)
1184            track_data[track_utc] = TRACK_REPORT(l5flag=l5flag,
1185                                                 pe=pe,
1186                                                 ant_top4cn=ant_top4_cn,
1187                                                 ant_cn=ant_cn,
1188                                                 base_top4cn=base_top4_cn,
1189                                                 base_cn=base_cn,
1190                                                 device_time=device_time,
1191                                                 report_time=report_time,
1192                                                 )
1193    ad.log.info("Total %d gnssstatus samples verified" %gnssstatus_count)
1194    ad.log.debug(track_data)
1195    prop_basename = UPLOAD_TO_SPONGE_PREFIX + f"{api_type.upper()}_tracking_"
1196    time_list = sorted(track_data.keys())
1197    l5flag_list = [track_data[key].l5flag for key in time_list]
1198    pe_list = [float(track_data[key].pe) for key in time_list]
1199    ant_top4cn_list = [float(track_data[key].ant_top4cn) for key in time_list]
1200    ant_cn_list = [float(track_data[key].ant_cn) for key in time_list]
1201    base_top4cn_list = [float(track_data[key].base_top4cn) for key in time_list]
1202    base_cn_list = [float(track_data[key].base_cn) for key in time_list]
1203    ad.log.info(prop_basename+"StartTime %s" % time_list[0].replace(" ", "-"))
1204    ad.log.info(prop_basename+"EndTime %s" % time_list[-1].replace(" ", "-"))
1205    ad.log.info(prop_basename+"TotalFixPoints %d" % len(time_list))
1206    ad.log.info(prop_basename+"L5FixRate "+'{percent:.2%}'.format(
1207        percent=l5flag_list.count("true")/len(l5flag_list)))
1208    ad.log.info(prop_basename+"AvgDis %.1f" % (sum(pe_list)/len(pe_list)))
1209    ad.log.info(prop_basename+"MaxDis %.1f" % max(pe_list))
1210    ad.log.info(prop_basename+"Ant_AvgTop4Signal %.1f" % ant_top4cn_list[-1])
1211    ad.log.info(prop_basename+"Ant_AvgSignal %.1f" % ant_cn_list[-1])
1212    ad.log.info(prop_basename+"Base_AvgTop4Signal %.1f" % base_top4cn_list[-1])
1213    ad.log.info(prop_basename+"Base_AvgSignal %.1f" % base_cn_list[-1])
1214    _log_svid_info(gnss_svid_container, prop_basename, ad)
1215    return track_data
1216
1217
1218def verify_gps_time_should_be_close_to_device_time(ad, tracking_result):
1219    """Check the time gap between GPS time and device time.
1220
1221    In normal cases, the GPS time should be close to device time. But if GPS week rollover happens,
1222    the GPS time may goes back to 20 years ago. In order to capture this issue, we assert the time
1223    diff between the GPS time and device time.
1224
1225    Args:
1226        ad: The device under test.
1227        tracking_result: The result we get from GNSS tracking.
1228    """
1229    ad.log.info("Validating GPS/Device time difference")
1230    max_time_diff_in_seconds = 3.0 if is_device_wearable() else 2.0
1231    exceed_report = []
1232    for report in tracking_result.values():
1233        time_diff_in_seconds = abs((report.report_time - report.device_time).total_seconds())
1234        if time_diff_in_seconds > max_time_diff_in_seconds:
1235            message = (f"GPS time: {report.report_time}  Device time: {report.device_time} "
1236                       f"diff: {time_diff_in_seconds}")
1237            exceed_report.append(message)
1238    fail_message = (f"The following items exceed {max_time_diff_in_seconds}s\n" +
1239                     "\n".join(exceed_report))
1240    asserts.assert_false(exceed_report, msg=fail_message)
1241
1242
1243def validate_location_fix_rate(ad, location_reported, run_time, fix_rate_criteria):
1244    """Check location reported count
1245
1246    The formula is "total_fix_points / (run_time * 60)"
1247    When the result is lower than fix_rate_criteria, fail the test case
1248
1249    Args:
1250        ad: AndroidDevice object
1251        location_reported: (Enumerate) Contains the reported location
1252        run_time: (int) How many minutes do we need to verify
1253        fix_rate_criteria: The threshold of the pass criteria
1254            if we expect fix rate to be 99%, then fix_rate_criteria should be 0.99
1255    """
1256    ad.log.info("Validating fix rate")
1257    pass_criteria = run_time * 60 * fix_rate_criteria
1258    actual_location_count = len(location_reported)
1259
1260    # The fix rate may exceed 100% occasionally, to standardlize the result
1261    # set maximum fix rate to 100%
1262    actual_fix_rate = min(1, (actual_location_count / (run_time * 60)))
1263    actual_fix_rate_percentage = f"{actual_fix_rate:.0%}"
1264
1265    log_prefix = UPLOAD_TO_SPONGE_PREFIX + f"FIX_RATE_"
1266    ad.log.info("%sresult %s" % (log_prefix, actual_fix_rate_percentage))
1267    ad.log.debug("Actual location count %s" % actual_location_count)
1268
1269    fail_message = (f"Fail to meet criteria. Expect to have at least {pass_criteria} location count"
1270                    f" Actual: {actual_location_count}")
1271    asserts.assert_true(pass_criteria <= actual_location_count, msg=fail_message)
1272
1273
1274def _log_svid_info(container, log_prefix, ad):
1275    """Write GnssSvidContainer svid information into logger
1276    Args:
1277        container: A GnssSvidContainer object
1278        log_prefix:
1279            A prefix used to specify the log will be upload to dashboard
1280        ad: An AndroidDevice object
1281    """
1282    for sv_type, svids in container.used_in_fix.items():
1283        message = f"{log_prefix}{sv_type} {len(svids)}"
1284        ad.log.info(message)
1285        ad.log.debug("Satellite used in fix %s ids are: %s", sv_type, svids)
1286
1287    for sv_type, svids in container.not_used_in_fix.items():
1288        ad.log.debug("Satellite not used in fix %s ids are: %s", sv_type, svids)
1289
1290
1291def process_ttff_by_gtw_gpstool(ad, begin_time, true_position, api_type="gnss"):
1292    """Process TTFF and record results in ttff_data.
1293
1294    Args:
1295        ad: An AndroidDevice object.
1296        begin_time: test begin time.
1297        true_position: Coordinate as [latitude, longitude] to calculate
1298        position error.
1299        api_type: Different API for location fix. Use gnss/flp/nmea
1300
1301    Returns:
1302        ttff_data: A dict of all TTFF data.
1303    """
1304    ttff_lat = 0
1305    ttff_lon = 0
1306    utc_time = epoch_to_human_time(get_current_epoch_time())
1307    ttff_data = {}
1308    ttff_loop_time = get_current_epoch_time()
1309    while True:
1310        if get_current_epoch_time() - ttff_loop_time >= 120000:
1311            raise signals.TestError("Fail to search specific GPSService "
1312                                    "message in logcat. Abort test.")
1313        if not ad.is_adb_logcat_on:
1314            ad.start_adb_logcat()
1315        logcat_results = ad.search_logcat("write TTFF log", ttff_loop_time)
1316        if logcat_results:
1317            ttff_loop_time = get_current_epoch_time()
1318            ttff_log = logcat_results[-1]["log_message"].split()
1319            ttff_loop = int(ttff_log[8].split(":")[-1])
1320            ttff_sec = float(ttff_log[11])
1321            if ttff_sec != 0.0:
1322                ttff_ant_cn = float(ttff_log[18].strip("]"))
1323                ttff_base_cn = float(ttff_log[25].strip("]"))
1324                if api_type == "gnss":
1325                    gnss_results = ad.search_logcat("GPSService: Check item",
1326                                                    begin_time)
1327                    if gnss_results:
1328                        ad.log.debug(gnss_results[-1]["log_message"])
1329                        gnss_location_log = \
1330                            gnss_results[-1]["log_message"].split()
1331                        ttff_lat = float(
1332                            gnss_location_log[8].split("=")[-1].strip(","))
1333                        ttff_lon = float(
1334                            gnss_location_log[9].split("=")[-1].strip(","))
1335                        loc_time = int(
1336                            gnss_location_log[10].split("=")[-1].strip(","))
1337                        utc_time = epoch_to_human_time(loc_time)
1338                        ttff_haccu = float(
1339                            gnss_location_log[11].split("=")[-1].strip(","))
1340                elif api_type == "flp":
1341                    flp_results = ad.search_logcat("GPSService: FLP Location",
1342                                                   begin_time)
1343                    if flp_results:
1344                        ad.log.debug(flp_results[-1]["log_message"])
1345                        flp_location_log = flp_results[-1][
1346                            "log_message"].split()
1347                        ttff_lat = float(flp_location_log[8].split(",")[0])
1348                        ttff_lon = float(flp_location_log[8].split(",")[1])
1349                        ttff_haccu = float(flp_location_log[9].split("=")[1])
1350                        utc_time = epoch_to_human_time(get_current_epoch_time())
1351            else:
1352                ttff_ant_cn = float(ttff_log[19].strip("]"))
1353                ttff_base_cn = float(ttff_log[26].strip("]"))
1354                ttff_lat = 0
1355                ttff_lon = 0
1356                ttff_haccu = 0
1357                utc_time = epoch_to_human_time(get_current_epoch_time())
1358            ad.log.debug("TTFF Loop %d - (Lat, Lon) = (%s, %s)" % (ttff_loop,
1359                                                                   ttff_lat,
1360                                                                   ttff_lon))
1361            ttff_pe = calculate_position_error(
1362                ttff_lat, ttff_lon, true_position)
1363            ttff_data[ttff_loop] = TTFF_REPORT(utc_time=utc_time,
1364                                               ttff_loop=ttff_loop,
1365                                               ttff_sec=ttff_sec,
1366                                               ttff_pe=ttff_pe,
1367                                               ttff_ant_cn=ttff_ant_cn,
1368                                               ttff_base_cn=ttff_base_cn,
1369                                               ttff_haccu=ttff_haccu)
1370            ad.log.info("UTC Time = %s, Loop %d = %.1f seconds, "
1371                        "Position Error = %.1f meters, "
1372                        "Antenna Average Signal = %.1f dbHz, "
1373                        "Baseband Average Signal = %.1f dbHz, "
1374                        "Horizontal Accuracy = %.1f meters" % (utc_time,
1375                                                                 ttff_loop,
1376                                                                 ttff_sec,
1377                                                                 ttff_pe,
1378                                                                 ttff_ant_cn,
1379                                                                 ttff_base_cn,
1380                                                                 ttff_haccu))
1381        stop_gps_results = ad.search_logcat("stop gps test", begin_time)
1382        if stop_gps_results:
1383            ad.send_keycode("HOME")
1384            break
1385        crash_result = ad.search_logcat("Force finishing activity "
1386                                        "com.android.gpstool/.GPSTool",
1387                                        begin_time)
1388        if crash_result:
1389            raise signals.TestError("GPSTool crashed. Abort test.")
1390        # wait 5 seconds to avoid logs not writing into logcat yet
1391        time.sleep(5)
1392    return ttff_data
1393
1394
1395def check_ttff_data(ad, ttff_data, ttff_mode, criteria):
1396    """Verify all TTFF results from ttff_data.
1397
1398    Args:
1399        ad: An AndroidDevice object.
1400        ttff_data: TTFF data of secs, position error and signal strength.
1401        ttff_mode: TTFF Test mode for current test item.
1402        criteria: Criteria for current test item.
1403
1404    Returns:
1405        True: All TTFF results are within criteria.
1406        False: One or more TTFF results exceed criteria or Timeout.
1407    """
1408    ad.log.info("%d iterations of TTFF %s tests finished."
1409                % (len(ttff_data.keys()), ttff_mode))
1410    ad.log.info("%s PASS criteria is %d seconds" % (ttff_mode, criteria))
1411    ad.log.debug("%s TTFF data: %s" % (ttff_mode, ttff_data))
1412    if len(ttff_data.keys()) == 0:
1413        ad.log.error("GTW_GPSTool didn't process TTFF properly.")
1414        raise ValueError("No ttff loop is done")
1415
1416    ttff_property_key_and_value(ad, ttff_data, ttff_mode)
1417
1418    if any(float(ttff_data[key].ttff_sec) == 0.0 for key in ttff_data.keys()):
1419        ad.log.error("One or more TTFF %s Timeout" % ttff_mode)
1420        return False
1421    elif any(float(ttff_data[key].ttff_sec) >= criteria for key in
1422             ttff_data.keys()):
1423        ad.log.error("One or more TTFF %s are over test criteria %d seconds"
1424                     % (ttff_mode, criteria))
1425        return False
1426    ad.log.info("All TTFF %s are within test criteria %d seconds."
1427                % (ttff_mode, criteria))
1428    return True
1429
1430
1431def ttff_property_key_and_value(ad, ttff_data, ttff_mode):
1432    """Output ttff_data to test_run_info for ACTS plugin to parse and display
1433    on MobileHarness as Property.
1434
1435    Args:
1436        ad: An AndroidDevice object.
1437        ttff_data: TTFF data of secs, position error and signal strength.
1438        ttff_mode: TTFF Test mode for current test item.
1439    """
1440    timeout_ttff = 61
1441    prop_basename = "TestResult "+ttff_mode.replace(" ", "_")+"_TTFF_"
1442    sec_list = [float(ttff_data[key].ttff_sec) for key in ttff_data.keys()]
1443    pe_list = [float(ttff_data[key].ttff_pe) for key in ttff_data.keys()]
1444    ant_cn_list = [float(ttff_data[key].ttff_ant_cn) for key in
1445                   ttff_data.keys()]
1446    base_cn_list = [float(ttff_data[key].ttff_base_cn) for key in
1447                    ttff_data.keys()]
1448    haccu_list = [float(ttff_data[key].ttff_haccu) for key in
1449                    ttff_data.keys()]
1450    timeoutcount = sec_list.count(0.0)
1451    sec_list = sorted(sec_list)
1452    if len(sec_list) == timeoutcount:
1453        median_ttff = avgttff = timeout_ttff
1454    else:
1455        avgttff = sum(sec_list)/(len(sec_list) - timeoutcount)
1456        median_ttff = median(sec_list)
1457    if timeoutcount != 0:
1458        maxttff = timeout_ttff
1459    else:
1460        maxttff = max(sec_list)
1461    avgdis = sum(pe_list)/len(pe_list)
1462    maxdis = max(pe_list)
1463    ant_avgcn = sum(ant_cn_list)/len(ant_cn_list)
1464    base_avgcn = sum(base_cn_list)/len(base_cn_list)
1465    avg_haccu = sum(haccu_list)/len(haccu_list)
1466    ad.log.info(prop_basename+"AvgTime %.1f" % avgttff)
1467    ad.log.info(prop_basename+"MedianTime %.1f" % median_ttff)
1468    ad.log.info(prop_basename+"MaxTime %.1f" % maxttff)
1469    ad.log.info(prop_basename+"TimeoutCount %d" % timeoutcount)
1470    ad.log.info(prop_basename+"AvgDis %.1f" % avgdis)
1471    ad.log.info(prop_basename+"MaxDis %.1f" % maxdis)
1472    ad.log.info(prop_basename+"Ant_AvgSignal %.1f" % ant_avgcn)
1473    ad.log.info(prop_basename+"Base_AvgSignal %.1f" % base_avgcn)
1474    ad.log.info(prop_basename+"Avg_Horizontal_Accuracy %.1f" % avg_haccu)
1475
1476
1477def calculate_position_error(latitude, longitude, true_position):
1478    """Use haversine formula to calculate position error base on true location
1479    coordinate.
1480
1481    Args:
1482        latitude: latitude of location fixed in the present.
1483        longitude: longitude of location fixed in the present.
1484        true_position: [latitude, longitude] of true location coordinate.
1485
1486    Returns:
1487        position_error of location fixed in the present.
1488    """
1489    radius = 6371009
1490    dlat = math.radians(latitude - true_position[0])
1491    dlon = math.radians(longitude - true_position[1])
1492    a = math.sin(dlat/2) * math.sin(dlat/2) + \
1493        math.cos(math.radians(true_position[0])) * \
1494        math.cos(math.radians(latitude)) * math.sin(dlon/2) * math.sin(dlon/2)
1495    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
1496    return radius * c
1497
1498
1499def launch_google_map(ad):
1500    """Launch Google Map via intent.
1501
1502    Args:
1503        ad: An AndroidDevice object.
1504    """
1505    ad.log.info("Launch Google Map.")
1506    try:
1507        if is_device_wearable(ad):
1508            cmd = ("am start -S -n com.google.android.apps.maps/"
1509                   "com.google.android.apps.gmmwearable.MainActivity")
1510        else:
1511            cmd = ("am start -S -n com.google.android.apps.maps/"
1512                   "com.google.android.maps.MapsActivity")
1513        ad.adb.shell(cmd)
1514        ad.send_keycode("BACK")
1515        ad.force_stop_apk("com.google.android.apps.maps")
1516        ad.adb.shell(cmd)
1517    except Exception as e:
1518        ad.log.error(e)
1519        raise signals.TestError("Failed to launch google map.")
1520    check_current_focus_app(ad)
1521
1522
1523def check_current_focus_app(ad):
1524    """Check to see current focused window and app.
1525
1526    Args:
1527        ad: An AndroidDevice object.
1528    Returns:
1529        string: the current focused window / app
1530    """
1531    time.sleep(1)
1532    current = ad.adb.shell(
1533        "dumpsys window | grep -E 'mCurrentFocus|mFocusedApp'")
1534    ad.log.debug("\n"+current)
1535    return current
1536
1537
1538def check_location_api(ad, retries):
1539    """Verify if GnssLocationProvider API reports location.
1540
1541    Args:
1542        ad: An AndroidDevice object.
1543        retries: Retry time.
1544
1545    Returns:
1546        True: GnssLocationProvider API reports location.
1547        otherwise return False.
1548    """
1549    for i in range(retries):
1550        begin_time = get_current_epoch_time()
1551        ad.log.info("Try to get location report from GnssLocationProvider API "
1552                    "- attempt %d" % (i+1))
1553        while get_current_epoch_time() - begin_time <= 30000:
1554            logcat_results = ad.search_logcat("reportLocation", begin_time)
1555            if logcat_results:
1556                ad.log.info("%s" % logcat_results[-1]["log_message"])
1557                ad.log.info("GnssLocationProvider reports location "
1558                            "successfully.")
1559                return True
1560        if not ad.is_adb_logcat_on:
1561            ad.start_adb_logcat()
1562    ad.log.error("GnssLocationProvider is unable to report location.")
1563    return False
1564
1565
1566def check_network_location(ad, retries, location_type, criteria=30):
1567    """Verify if NLP reports location after requesting via GPSTool.
1568
1569    Args:
1570        ad: An AndroidDevice object.
1571        retries: Retry time.
1572        location_type: cell or wifi.
1573        criteria: expected nlp return time, default 30 seconds
1574
1575    Returns:
1576        True: NLP reports location.
1577        otherwise return False.
1578    """
1579    criteria = criteria * 1000
1580    search_pattern = ("GPSTool : networkLocationType = %s" % location_type)
1581    for i in range(retries):
1582        # Capture the begin time 1 seconds before due to time gap.
1583        begin_time = get_current_epoch_time() - 1000
1584        ad.log.info("Try to get NLP status - attempt %d" % (i+1))
1585        ad.adb.shell(
1586            "am start -S -n com.android.gpstool/.GPSTool --es mode nlp")
1587        while get_current_epoch_time() - begin_time <= criteria:
1588            # Search pattern in 1 second interval
1589            time.sleep(1)
1590            result = ad.search_logcat(search_pattern, begin_time)
1591            if result:
1592                ad.log.info("Pattern Found: %s." % result[-1]["log_message"])
1593                ad.send_keycode("BACK")
1594                return True
1595        if not ad.is_adb_logcat_on:
1596            ad.start_adb_logcat()
1597        ad.send_keycode("BACK")
1598    ad.log.error("Unable to report network location \"%s\"." % location_type)
1599    return False
1600
1601
1602def set_attenuator_gnss_signal(ad, attenuator, atten_value):
1603    """Set attenuation value for different GNSS signal.
1604
1605    Args:
1606        ad: An AndroidDevice object.
1607        attenuator: The attenuator object.
1608        atten_value: attenuation value
1609    """
1610    try:
1611        ad.log.info(
1612            "Set attenuation value to \"%d\" for GNSS signal." % atten_value)
1613        attenuator[0].set_atten(atten_value)
1614    except Exception as e:
1615        ad.log.error(e)
1616
1617
1618def set_battery_saver_mode(ad, state):
1619    """Enable or disable battery saver mode via adb.
1620
1621    Args:
1622        ad: An AndroidDevice object.
1623        state: True is enable Battery Saver mode. False is disable.
1624    """
1625    ad.root_adb()
1626    if state:
1627        ad.log.info("Enable Battery Saver mode.")
1628        ad.adb.shell("cmd battery unplug")
1629        ad.adb.shell("settings put global low_power 1")
1630    else:
1631        ad.log.info("Disable Battery Saver mode.")
1632        ad.adb.shell("settings put global low_power 0")
1633        ad.adb.shell("cmd battery reset")
1634
1635
1636def set_gnss_qxdm_mask(ad, masks):
1637    """Find defined gnss qxdm mask and set as default logging mask.
1638
1639    Args:
1640        ad: An AndroidDevice object.
1641        masks: Defined gnss qxdm mask.
1642    """
1643    try:
1644        for mask in masks:
1645            if not tlutils.find_qxdm_log_mask(ad, mask):
1646                continue
1647            tlutils.set_qxdm_logger_command(ad, mask)
1648            break
1649    except Exception as e:
1650        ad.log.error(e)
1651        raise signals.TestError("Failed to set any QXDM masks.")
1652
1653
1654def start_youtube_video(ad, url=None, retries=0):
1655    """Start youtube video and verify if audio is in music state.
1656
1657    Args:
1658        ad: An AndroidDevice object.
1659        url: Youtube video url.
1660        retries: Retry times if audio is not in music state.
1661
1662    Returns:
1663        True if youtube video is playing normally.
1664        False if youtube video is not playing properly.
1665    """
1666    for i in range(retries):
1667        ad.log.info("Open an youtube video - attempt %d" % (i+1))
1668        cmd = ("am start -n com.google.android.youtube/"
1669               "com.google.android.youtube.UrlActivity -d \"%s\"" % url)
1670        ad.adb.shell(cmd)
1671        time.sleep(2)
1672        out = ad.adb.shell(
1673            "dumpsys activity | grep NewVersionAvailableActivity")
1674        if out:
1675            ad.log.info("Skip Youtube New Version Update.")
1676            ad.send_keycode("BACK")
1677        if tutils.wait_for_state(ad.droid.audioIsMusicActive, True, 15, 1):
1678            ad.log.info("Started a video in youtube, audio is in MUSIC state")
1679            return True
1680        ad.log.info("Force-Stop youtube and reopen youtube again.")
1681        ad.force_stop_apk("com.google.android.youtube")
1682    check_current_focus_app(ad)
1683    raise signals.TestError("Started a video in youtube, "
1684                            "but audio is not in MUSIC state")
1685
1686
1687def get_gms_version(ad):
1688    cmd = "dumpsys package com.google.android.gms | grep versionName"
1689    return ad.adb.shell(cmd).split("\n")[0].split("=")[1]
1690
1691
1692def get_baseband_and_gms_version(ad, extra_msg=""):
1693    """Get current radio baseband and GMSCore version of AndroidDevice object.
1694
1695    Args:
1696        ad: An AndroidDevice object.
1697        extra_msg: Extra message before or after the change.
1698    """
1699    mpss_version = ""
1700    brcm_gps_version = ""
1701    brcm_sensorhub_version = ""
1702    try:
1703        build_version = ad.adb.getprop("ro.build.id")
1704        baseband_version = ad.adb.getprop("gsm.version.baseband")
1705        gms_version = get_gms_version(ad)
1706        if check_chipset_vendor_by_qualcomm(ad):
1707            mpss_version = ad.adb.shell(
1708                "cat /sys/devices/soc0/images | grep MPSS | cut -d ':' -f 3")
1709        else:
1710            brcm_gps_version = ad.adb.shell("cat /data/vendor/gps/chip.info")
1711            sensorhub_version = ad.adb.shell(
1712                "cat /vendor/firmware/SensorHub.patch | grep ChangeList")
1713            brcm_sensorhub_version = re.compile(
1714                r'<ChangeList=(\w+)>').search(sensorhub_version).group(1)
1715        if not extra_msg:
1716            ad.log.info("TestResult Build_Version %s" % build_version)
1717            ad.log.info("TestResult Baseband_Version %s" % baseband_version)
1718            ad.log.info(
1719                "TestResult GMS_Version %s" % gms_version.replace(" ", ""))
1720            if check_chipset_vendor_by_qualcomm(ad):
1721                ad.log.info("TestResult MPSS_Version %s" % mpss_version)
1722            else:
1723                ad.log.info("TestResult GPS_Version %s" % brcm_gps_version)
1724                ad.log.info(
1725                    "TestResult SensorHub_Version %s" % brcm_sensorhub_version)
1726        else:
1727            ad.log.info(
1728                "%s, Baseband_Version = %s" % (extra_msg, baseband_version))
1729    except Exception as e:
1730        ad.log.error(e)
1731
1732
1733def start_toggle_gnss_by_gtw_gpstool(ad, iteration):
1734    """Send toggle gnss off/on start_test_action
1735
1736    Args:
1737        ad: An AndroidDevice object.
1738        iteration: Iteration of toggle gnss off/on cycles.
1739    """
1740    msg_list = []
1741    begin_time = get_current_epoch_time()
1742    try:
1743        for i in range(1, 4):
1744            ad.adb.shell("am start -S -n com.android.gpstool/.GPSTool "
1745                         "--es mode toggle --es cycle %d" % iteration)
1746            time.sleep(1)
1747            if is_device_wearable(ad):
1748                # Wait 20 seconds for Wearable low performance time.
1749                time.sleep(20)
1750                if ad.search_logcat("ToggleGPS onResume",
1751                                begin_time):
1752                    ad.log.info("Send ToggleGPS start_test_action successfully.")
1753                    break
1754            elif ad.search_logcat("cmp=com.android.gpstool/.ToggleGPS",
1755                                begin_time):
1756                ad.log.info("Send ToggleGPS start_test_action successfully.")
1757                break
1758        else:
1759            check_current_focus_app(ad)
1760            raise signals.TestError("Fail to send ToggleGPS "
1761                                    "start_test_action within 3 attempts.")
1762        time.sleep(2)
1763        if is_device_wearable(ad):
1764            test_start = ad.search_logcat("GPSService: create toggle GPS log",
1765                                      begin_time)
1766        else:
1767            test_start = ad.search_logcat("GPSTool_ToggleGPS: startService",
1768                                      begin_time)
1769        if test_start:
1770            ad.log.info(test_start[-1]["log_message"].split(":")[-1].strip())
1771        else:
1772            raise signals.TestError("Fail to start toggle GPS off/on test.")
1773        # Every iteration is expected to finish within 4 minutes.
1774        while get_current_epoch_time() - begin_time <= iteration * 240000:
1775            crash_end = ad.search_logcat("Force finishing activity "
1776                                         "com.android.gpstool/.GPSTool",
1777                                         begin_time)
1778            if crash_end:
1779                raise signals.TestError("GPSTool crashed. Abort test.")
1780            toggle_results = ad.search_logcat("GPSTool : msg", begin_time)
1781            if toggle_results:
1782                for toggle_result in toggle_results:
1783                    msg = toggle_result["log_message"]
1784                    if not msg in msg_list:
1785                        ad.log.info(msg.split(":")[-1].strip())
1786                        msg_list.append(msg)
1787                    if "timeout" in msg:
1788                        raise signals.TestFailure("Fail to get location fixed "
1789                                                  "within 60 seconds.")
1790                    if "Test end" in msg:
1791                        raise signals.TestPass("Completed quick toggle GNSS "
1792                                               "off/on test.")
1793        raise signals.TestFailure("Fail to finish toggle GPS off/on test "
1794                                  "within %d minutes" % (iteration * 4))
1795    finally:
1796        ad.send_keycode("HOME")
1797
1798
1799def grant_location_permission(ad, option):
1800    """Grant or revoke location related permission.
1801
1802    Args:
1803        ad: An AndroidDevice object.
1804        option: Boolean to grant or revoke location related permissions.
1805    """
1806    action = "grant" if option else "revoke"
1807    for permission in LOCATION_PERMISSIONS:
1808        ad.log.info(
1809            "%s permission:%s on %s" % (action, permission, TEST_PACKAGE_NAME))
1810        ad.adb.shell("pm %s %s %s" % (action, TEST_PACKAGE_NAME, permission))
1811
1812
1813def check_location_runtime_permissions(ad, package, permissions):
1814    """Check if runtime permissions are granted on selected package.
1815
1816    Args:
1817        ad: An AndroidDevice object.
1818        package: Apk package name to check.
1819        permissions: A list of permissions to be granted.
1820    """
1821    for _ in range(3):
1822        location_runtime_permission = ad.adb.shell(
1823            "dumpsys package %s | grep ACCESS_FINE_LOCATION" % package)
1824        if "true" not in location_runtime_permission:
1825            ad.log.info("ACCESS_FINE_LOCATION is NOT granted on %s" % package)
1826            for permission in permissions:
1827                ad.log.debug("Grant %s on %s" % (permission, package))
1828                ad.adb.shell("pm grant %s %s" % (package, permission))
1829        else:
1830            ad.log.info("ACCESS_FINE_LOCATION is granted on %s" % package)
1831            break
1832    else:
1833        raise signals.TestError(
1834            "Fail to grant ACCESS_FINE_LOCATION on %s" % package)
1835
1836
1837def install_mdstest_app(ad, mdsapp):
1838    """
1839        Install MDS test app in DUT
1840
1841        Args:
1842            ad: An Android Device Object
1843            mdsapp: Installation path of MDSTest app
1844    """
1845    if not ad.is_apk_installed("com.google.mdstest"):
1846        ad.adb.install("-r %s" % mdsapp, timeout=300, ignore_status=True)
1847
1848
1849def write_modemconfig(ad, mdsapp, nvitem_dict, modemparfile):
1850    """
1851        Modify the NV items using modem_tool.par
1852        Note: modem_tool.par
1853
1854        Args:
1855            ad:  An Android Device Object
1856            mdsapp: Installation path of MDSTest app
1857            nvitem_dict: dictionary of NV items and values.
1858            modemparfile: modem_tool.par path.
1859    """
1860    ad.log.info("Verify MDSTest app installed in DUT")
1861    install_mdstest_app(ad, mdsapp)
1862    os.system("chmod 777 %s" % modemparfile)
1863    for key, value in nvitem_dict.items():
1864        if key.isdigit():
1865            op_name = "WriteEFS"
1866        else:
1867            op_name = "WriteNV"
1868        ad.log.info("Modifying the NV{!r} using {}".format(key, op_name))
1869        job.run("{} --op {} --item {} --data '{}'".
1870                format(modemparfile, op_name, key, value))
1871        time.sleep(2)
1872
1873
1874def verify_modemconfig(ad, nvitem_dict, modemparfile):
1875    """
1876        Verify the NV items using modem_tool.par
1877        Note: modem_tool.par
1878
1879        Args:
1880            ad:  An Android Device Object
1881            nvitem_dict: dictionary of NV items and values
1882            modemparfile: modem_tool.par path.
1883    """
1884    os.system("chmod 777 %s" % modemparfile)
1885    for key, value in nvitem_dict.items():
1886        if key.isdigit():
1887            op_name = "ReadEFS"
1888        else:
1889            op_name = "ReadNV"
1890        # Sleeptime to avoid Modem communication error
1891        time.sleep(5)
1892        result = job.run(
1893            "{} --op {} --item {}".format(modemparfile, op_name, key))
1894        output = str(result.stdout)
1895        ad.log.info("Actual Value for NV{!r} is {!r}".format(key, output))
1896        if not value.casefold() in output:
1897            ad.log.error("NV Value is wrong {!r} in {!r}".format(value, result))
1898            raise ValueError(
1899                "could not find {!r} in {!r}".format(value, result))
1900
1901
1902def check_ttff_pe(ad, ttff_data, ttff_mode, pe_criteria):
1903    """Verify all TTFF results from ttff_data.
1904
1905    Args:
1906        ad: An AndroidDevice object.
1907        ttff_data: TTFF data of secs, position error and signal strength.
1908        ttff_mode: TTFF Test mode for current test item.
1909        pe_criteria: Criteria for current test item.
1910
1911    """
1912    ad.log.info("%d iterations of TTFF %s tests finished."
1913                % (len(ttff_data.keys()), ttff_mode))
1914    ad.log.info("%s PASS criteria is %f meters" % (ttff_mode, pe_criteria))
1915    ad.log.debug("%s TTFF data: %s" % (ttff_mode, ttff_data))
1916
1917    if len(ttff_data.keys()) == 0:
1918        ad.log.error("GTW_GPSTool didn't process TTFF properly.")
1919        raise signals.TestFailure("GTW_GPSTool didn't process TTFF properly.")
1920
1921    elif any(float(ttff_data[key].ttff_pe) >= pe_criteria for key in
1922             ttff_data.keys()):
1923        ad.log.error("One or more TTFF %s are over test criteria %f meters"
1924                     % (ttff_mode, pe_criteria))
1925        raise signals.TestFailure("GTW_GPSTool didn't process TTFF properly.")
1926    else:
1927        ad.log.info("All TTFF %s are within test criteria %f meters." % (
1928            ttff_mode, pe_criteria))
1929        return True
1930
1931
1932def check_adblog_functionality(ad):
1933    """Restart adb logcat if system can't write logs into file after checking
1934    adblog file size.
1935
1936    Args:
1937        ad: An AndroidDevice object.
1938    """
1939    logcat_path = os.path.join(ad.device_log_path, "adblog_%s_debug.txt" %
1940                               ad.serial)
1941    if not os.path.exists(logcat_path):
1942        raise signals.TestError("Logcat file %s does not exist." % logcat_path)
1943    original_log_size = os.path.getsize(logcat_path)
1944    ad.log.debug("Original adblog size is %d" % original_log_size)
1945    time.sleep(.5)
1946    current_log_size = os.path.getsize(logcat_path)
1947    ad.log.debug("Current adblog size is %d" % current_log_size)
1948    if current_log_size == original_log_size:
1949        ad.log.warn("System can't write logs into file. Restart adb "
1950                    "logcat process now.")
1951        ad.stop_adb_logcat()
1952        ad.start_adb_logcat()
1953
1954
1955def build_instrumentation_call(package,
1956                               runner,
1957                               test_methods=None,
1958                               options=None):
1959    """Build an instrumentation call for the tests
1960
1961    Args:
1962        package: A string to identify test package.
1963        runner: A string to identify test runner.
1964        test_methods: A dictionary contains {class_name, test_method}.
1965        options: A dictionary constant {key, value} param for test.
1966
1967    Returns:
1968        An instrumentation call command.
1969    """
1970    if test_methods is None:
1971        test_methods = {}
1972        cmd_builder = InstrumentationCommandBuilder()
1973    else:
1974        cmd_builder = InstrumentationTestCommandBuilder()
1975    if options is None:
1976        options = {}
1977    cmd_builder.set_manifest_package(package)
1978    cmd_builder.set_runner(runner)
1979    cmd_builder.add_flag("-w")
1980    for class_name, test_method in test_methods.items():
1981        cmd_builder.add_test_method(class_name, test_method)
1982    for option_key, option_value in options.items():
1983        cmd_builder.add_key_value_param(option_key, option_value)
1984    return cmd_builder.build()
1985
1986
1987def check_chipset_vendor_by_qualcomm(ad):
1988    """Check if chipset vendor is by Qualcomm.
1989
1990    Args:
1991        ad: An AndroidDevice object.
1992
1993    Returns:
1994        True if it's by Qualcomm. False irf not.
1995    """
1996    if is_device_wearable(ad):
1997        props = str(ad.adb.shell("getprop"))
1998        return True if _WEARABLE_QCOM_VENDOR_REGEX.search(props) else False
1999    else:
2000        soc = str(ad.adb.shell("getprop gsm.version.ril-impl"))
2001        ad.log.debug("SOC = %s" % soc)
2002        return "Qualcomm" in soc
2003
2004
2005def delete_lto_file(ad):
2006    """Delete downloaded LTO files.
2007
2008    Args:
2009        ad: An AndroidDevice object.
2010    """
2011    remount_device(ad)
2012    status = ad.adb.shell("rm -rf /data/vendor/gps/lto*")
2013    ad.log.info("Delete downloaded LTO files.\n%s" % status)
2014
2015
2016def lto_mode(ad, state):
2017    """Enable or Disable LTO mode.
2018
2019    Args:
2020        ad: An AndroidDevice object.
2021        state: True to enable. False to disable.
2022    """
2023    server_list = ["LONGTERM_PSDS_SERVER_1",
2024                   "LONGTERM_PSDS_SERVER_2",
2025                   "LONGTERM_PSDS_SERVER_3",
2026                   "NORMAL_PSDS_SERVER",
2027                   "REALTIME_PSDS_SERVER"]
2028    delete_lto_file(ad)
2029    if state:
2030        tmp_path = tempfile.mkdtemp()
2031        ad.pull_files("/etc/gps_debug.conf", tmp_path)
2032        gps_conf_path = os.path.join(tmp_path, "gps_debug.conf")
2033        gps_conf_file = open(gps_conf_path, "r")
2034        lines = gps_conf_file.readlines()
2035        gps_conf_file.close()
2036        fout = open(gps_conf_path, "w")
2037        for line in lines:
2038            for server in server_list:
2039                if server in line:
2040                    line = line.replace(line, "")
2041            fout.write(line)
2042        fout.close()
2043        ad.push_system_file(gps_conf_path, "/etc/gps_debug.conf")
2044        ad.log.info("Push back modified gps_debug.conf")
2045        ad.log.info("LTO/RTO/RTI enabled")
2046        shutil.rmtree(tmp_path, ignore_errors=True)
2047    else:
2048        ad.adb.shell("echo %r >> /etc/gps_debug.conf" %
2049                     DISABLE_LTO_FILE_CONTENTS)
2050        ad.log.info("LTO/RTO/RTI disabled")
2051    reboot(ad)
2052
2053
2054def lto_mode_wearable(ad, state):
2055    """Enable or Disable LTO mode for wearable in Android R release.
2056
2057    Args:
2058        ad: An AndroidDevice object.
2059        state: True to enable. False to disable.
2060    """
2061    rto_enable = '    RtoEnable="true"\n'
2062    rto_disable = '    RtoEnable="false"\n'
2063    rti_enable = '    RtiEnable="true"\n'
2064    rti_disable = '    RtiEnable="false"\n'
2065    sync_lto_enable = '    HttpDirectSyncLto="true"\n'
2066    sync_lto_disable = '    HttpDirectSyncLto="false"\n'
2067    server_list = ["XTRA_SERVER_1", "XTRA_SERVER_2", "XTRA_SERVER_3"]
2068    delete_lto_file(ad)
2069    tmp_path = tempfile.mkdtemp()
2070    ad.pull_files("/vendor/etc/gnss/gps.xml", tmp_path)
2071    gps_xml_path = os.path.join(tmp_path, "gps.xml")
2072    gps_xml_file = open(gps_xml_path, "r")
2073    lines = gps_xml_file.readlines()
2074    gps_xml_file.close()
2075    fout = open(gps_xml_path, "w")
2076    for line in lines:
2077        if state:
2078            if rto_disable in line:
2079                line = line.replace(line, rto_enable)
2080                ad.log.info("RTO enabled")
2081            elif rti_disable in line:
2082                line = line.replace(line, rti_enable)
2083                ad.log.info("RTI enabled")
2084            elif sync_lto_disable in line:
2085                line = line.replace(line, sync_lto_enable)
2086                ad.log.info("LTO sync enabled")
2087        else:
2088            if rto_enable in line:
2089                line = line.replace(line, rto_disable)
2090                ad.log.info("RTO disabled")
2091            elif rti_enable in line:
2092                line = line.replace(line, rti_disable)
2093                ad.log.info("RTI disabled")
2094            elif sync_lto_enable in line:
2095                line = line.replace(line, sync_lto_disable)
2096                ad.log.info("LTO sync disabled")
2097        fout.write(line)
2098    fout.close()
2099    ad.push_system_file(gps_xml_path, "/vendor/etc/gnss/gps.xml")
2100    ad.log.info("Push back modified gps.xml")
2101    shutil.rmtree(tmp_path, ignore_errors=True)
2102    if state:
2103        xtra_tmp_path = tempfile.mkdtemp()
2104        ad.pull_files("/etc/gps_debug.conf", xtra_tmp_path)
2105        gps_conf_path = os.path.join(xtra_tmp_path, "gps_debug.conf")
2106        gps_conf_file = open(gps_conf_path, "r")
2107        lines = gps_conf_file.readlines()
2108        gps_conf_file.close()
2109        fout = open(gps_conf_path, "w")
2110        for line in lines:
2111            for server in server_list:
2112                if server in line:
2113                    line = line.replace(line, "")
2114            fout.write(line)
2115        fout.close()
2116        ad.push_system_file(gps_conf_path, "/etc/gps_debug.conf")
2117        ad.log.info("Push back modified gps_debug.conf")
2118        ad.log.info("LTO/RTO/RTI enabled")
2119        shutil.rmtree(xtra_tmp_path, ignore_errors=True)
2120    else:
2121        ad.adb.shell(
2122            "echo %r >> /etc/gps_debug.conf" % DISABLE_LTO_FILE_CONTENTS_R)
2123        ad.log.info("LTO/RTO/RTI disabled")
2124
2125
2126def start_pixel_logger(ad, max_log_size_mb=100, max_number_of_files=500):
2127    """adb to start pixel logger for GNSS logging.
2128
2129    Args:
2130        ad: An AndroidDevice object.
2131        max_log_size_mb: Determines when to create a new log file if current
2132            one reaches the size limit.
2133        max_number_of_files: Determines how many log files can be saved on DUT.
2134    """
2135    retries = 3
2136    start_timeout_sec = 60
2137    default_gnss_cfg = "/vendor/etc/mdlog/DEFAULT+SECURITY+FULLDPL+GPS.cfg"
2138    if check_chipset_vendor_by_qualcomm(ad):
2139        start_cmd = ("am startservice -a com.android.pixellogger."
2140                     "service.logging.LoggingService.ACTION_START_LOGGING "
2141                     "-e intent_key_cfg_path '%s' "
2142                     "--ei intent_key_max_log_size_mb %d "
2143                     "--ei intent_key_max_number_of_files %d" %
2144                     (default_gnss_cfg, max_log_size_mb, max_number_of_files))
2145    else:
2146        start_cmd = ("am startservice -a com.android.pixellogger."
2147                     "service.logging.LoggingService.ACTION_START_LOGGING "
2148                     "-e intent_logger brcm_gps "
2149                     "--ei intent_key_max_log_size_mb %d "
2150                     "--ei intent_key_max_number_of_files %d" %
2151                     (max_log_size_mb, max_number_of_files))
2152    for attempt in range(retries):
2153        begin_time = get_current_epoch_time() - 3000
2154        ad.log.info("Start Pixel Logger - Attempt %d" % (attempt + 1))
2155        ad.adb.shell(start_cmd)
2156        while get_current_epoch_time() - begin_time <= start_timeout_sec * 1000:
2157            if not ad.is_adb_logcat_on:
2158                ad.start_adb_logcat()
2159            if check_chipset_vendor_by_qualcomm(ad):
2160                start_result = ad.search_logcat(
2161                    "ModemLogger: Start logging", begin_time)
2162            else:
2163                start_result = ad.search_logcat("startRecording", begin_time)
2164            if start_result:
2165                ad.log.info("Pixel Logger starts recording successfully.")
2166                return True
2167        stop_pixel_logger(ad)
2168    else:
2169        ad.log.warn("Pixel Logger fails to start recording in %d seconds "
2170                    "within %d attempts." % (start_timeout_sec, retries))
2171
2172
2173def stop_pixel_logger(ad):
2174    """adb to stop pixel logger for GNSS logging.
2175
2176    Args:
2177        ad: An AndroidDevice object.
2178    """
2179    retries = 3
2180    stop_timeout_sec = 60
2181    zip_timeout_sec = 30
2182    if check_chipset_vendor_by_qualcomm(ad):
2183        stop_cmd = ("am startservice -a com.android.pixellogger."
2184                    "service.logging.LoggingService.ACTION_STOP_LOGGING")
2185    else:
2186        stop_cmd = ("am startservice -a com.android.pixellogger."
2187                    "service.logging.LoggingService.ACTION_STOP_LOGGING "
2188                    "-e intent_logger brcm_gps")
2189    for attempt in range(retries):
2190        begin_time = get_current_epoch_time() - 3000
2191        ad.log.info("Stop Pixel Logger - Attempt %d" % (attempt + 1))
2192        ad.adb.shell(stop_cmd)
2193        while get_current_epoch_time() - begin_time <= stop_timeout_sec * 1000:
2194            if not ad.is_adb_logcat_on:
2195                ad.start_adb_logcat()
2196            stop_result = ad.search_logcat(
2197                "LoggingService: Stopping service", begin_time)
2198            if stop_result:
2199                ad.log.info("Pixel Logger stops successfully.")
2200                zip_end_time = time.time() + zip_timeout_sec
2201                while time.time() < zip_end_time:
2202                    zip_file_created = ad.search_logcat(
2203                        "FileUtil: Zip file has been created", begin_time)
2204                    if zip_file_created:
2205                        ad.log.info("Pixel Logger created zip file "
2206                                    "successfully.")
2207                        return True
2208                else:
2209                    ad.log.warn("Pixel Logger failed to create zip file.")
2210                    return False
2211        ad.force_stop_apk("com.android.pixellogger")
2212    else:
2213        ad.log.warn("Pixel Logger fails to stop in %d seconds within %d "
2214                    "attempts." % (stop_timeout_sec, retries))
2215
2216
2217def launch_eecoexer(ad):
2218    """Launch EEcoexer.
2219
2220    Args:
2221        ad: An AndroidDevice object.
2222    Raise:
2223        signals.TestError if DUT fails to launch EEcoexer
2224    """
2225    launch_cmd = ("am start -a android.intent.action.MAIN -n"
2226                  "com.google.eecoexer"
2227                  "/.MainActivity")
2228    ad.adb.shell(launch_cmd)
2229    try:
2230        ad.log.info("Launch EEcoexer.")
2231    except Exception as e:
2232        ad.log.error(e)
2233        raise signals.TestError("Failed to launch EEcoexer.")
2234
2235
2236def execute_eecoexer_function(ad, eecoexer_args):
2237    """Execute EEcoexer commands.
2238
2239    Args:
2240        ad: An AndroidDevice object.
2241        eecoexer_args: EEcoexer function arguments
2242    """
2243    cat_index = eecoexer_args.split(',')[:2]
2244    cat_index = ','.join(cat_index)
2245    enqueue_cmd = ("am broadcast -a com.google.eecoexer.action.LISTENER"
2246                   " --es sms_body ENQUEUE,{}".format(eecoexer_args))
2247    exe_cmd = ("am broadcast -a com.google.eecoexer.action.LISTENER"
2248               " --es sms_body EXECUTE")
2249    wait_for_cmd = ("am broadcast -a com.google.eecoexer.action.LISTENER"
2250                   " --es sms_body WAIT_FOR_COMPLETE,{}".format(cat_index))
2251    ad.log.info("EEcoexer Add Enqueue: {}".format(eecoexer_args))
2252    ad.adb.shell(enqueue_cmd)
2253    ad.log.info("EEcoexer Excute.")
2254    ad.adb.shell(exe_cmd)
2255    ad.log.info("Wait EEcoexer for complete")
2256    ad.adb.shell(wait_for_cmd)
2257
2258
2259def get_process_pid(ad, process_name):
2260    """Gets the process PID
2261
2262    Args:
2263        ad: The device under test
2264        process_name: The name of the process
2265
2266    Returns:
2267        The PID of the process
2268    """
2269    command = f"ps -A | grep {process_name} |  awk '{{print $2}}'"
2270    pid = ad.adb.shell(command)
2271    return pid
2272
2273
2274def restart_gps_daemons(ad, service):
2275    """Restart GPS daemons by killing services of gpsd, lhd and scd.
2276
2277    Args:
2278        ad: An AndroidDevice object.
2279
2280    Returns:
2281        kill_start_time: The time GPSd being killed.
2282    """
2283    kill_start_time = 0
2284    ad.root_adb()
2285    ad.log.info("Kill GPS daemon \"%s\"" % service)
2286    service_pid = get_process_pid(ad, service)
2287    ad.log.debug("%s PID: %s" % (service, service_pid))
2288    ad.adb.shell(f"kill -9 {service_pid}")
2289    kill_start_time = get_current_epoch_time()
2290    new_pid, recover_time = get_new_pid_process_time(ad, service_pid, service, 20)
2291
2292    ad.log.info("GPS daemon \"%s\" restarts successfully. PID from %s to %s" % (
2293        service, service_pid, new_pid))
2294    ad.log.info("\t- \"%s\" process recovered time: %d ms" % (service, recover_time))
2295    return kill_start_time
2296
2297
2298def get_new_pid_process_time(ad, origin_pid, process_name, timeout):
2299    """Get the new process PID and the time it took for restarting
2300
2301    Args:
2302        ad: An AndroidDevice object.
2303        origin_pid: The original pid of specified process
2304        process_name: Name of process
2305        timeout: Timeout of checking
2306
2307    Returns:
2308        1. How long takes for restarting the specified process.
2309        2. New PID
2310    """
2311    begin_time = get_current_epoch_time()
2312    pid = None
2313    while not pid and get_current_epoch_time() - begin_time < timeout * 1000:
2314        pid = get_process_pid(ad, process_name)
2315        if pid and origin_pid != pid:
2316            ad.log.debug("%s new PID: %s" % (process_name, pid))
2317            return pid, get_current_epoch_time() - begin_time
2318    raise ValueError("Unable to restart \"%s\"" % process_name)
2319
2320
2321def get_gpsd_update_time(ad, begin_time, dwelltime=30):
2322    """Get the UTC time of first GPSd status update shows up after begin_time
2323
2324    Args:
2325        ad: An AndroidDevice object.
2326        begin_time: The start time of the log.
2327        dwelltime: Waiting time for gnss status update. Default is 30 seconds.
2328
2329    Returns:
2330        The datetime object which indicates when is first GPSd status update
2331    """
2332    ad.log.info("Checking GNSS status after %s",
2333                datetime.fromtimestamp( begin_time / 1000))
2334    time.sleep(dwelltime)
2335
2336    gnss_status = ad.search_logcat("Gnss status update",
2337                                    begin_time=begin_time)
2338    if not gnss_status:
2339        raise ValueError("No \"GNSS status update\" found in logs.")
2340    ad.log.info("GNSS status update found.")
2341    return int(gnss_status[0]["datetime_obj"].timestamp() * 1000)
2342
2343
2344def get_location_fix_time_via_gpstool_log(ad, begin_time):
2345    """Get the UTC time of location update with given
2346    device time from the log output by GPSTool.
2347
2348    Args:
2349        ad: An AndroidDevice object.
2350        begin_time: The start time of the log.
2351
2352    Returns:
2353        The datetime object which indicates when is the
2354        first location fix time shows up
2355    """
2356    location_fix_time = ad.search_logcat("GPSService: Time",
2357                                         begin_time=begin_time)
2358    if not location_fix_time:
2359        raise ValueError("No \"Location fix time\" found in logs.")
2360    return int(location_fix_time[0]["datetime_obj"].timestamp() * 1000)
2361
2362
2363def get_gps_process_and_kill_function_by_vendor(ad):
2364    """Get process to be killed by vendor and
2365    return the kill function accordingly
2366
2367    Args:
2368        ad: An AndroidDevice object.
2369
2370    Returns:
2371        killed_processes: What processes to be killed
2372        functions: The methods for killing each process
2373    """
2374    if check_chipset_vendor_by_qualcomm(ad):
2375        ad.log.info("Triggered modem SSR")
2376        return {"ssr": functools.partial(gnss_trigger_modem_ssr_by_mds, ad=ad)}
2377    else:
2378        ad.log.info("Triggered restarting GPS daemons")
2379        return {"gpsd":  functools.partial(restart_gps_daemons, ad=ad, service="gpsd"),
2380                "scd": functools.partial(restart_gps_daemons, ad=ad, service="scd"),
2381                "lhd": functools.partial(restart_gps_daemons, ad=ad, service="lhd"),}
2382
2383
2384def is_device_wearable(ad):
2385    """Check device is wearable project or not.
2386
2387    Args:
2388        ad: An AndroidDevice object.
2389    """
2390    package = ad.adb.getprop("ro.cw.home_package_names")
2391    ad.log.debug("[ro.cw.home_package_names]: [%s]" % package)
2392    return "wearable" in package
2393
2394
2395def is_mobile_data_on(ad):
2396    """Check if mobile data of device is on.
2397
2398    Args:
2399        ad: An AndroidDevice object.
2400    """
2401    if is_device_wearable(ad):
2402        cell_on = ad.adb.shell("settings get global cell_on")
2403        ad.log.debug("Current mobile status is %s" % cell_on)
2404        return "1" in cell_on
2405    else:
2406        return ad.droid.telephonyIsDataEnabled()
2407
2408
2409def human_to_epoch_time(human_time):
2410    """Convert human readable time to epoch time.
2411
2412    Args:
2413        human_time: Human readable time. (Ex: 2020-08-04 13:24:28.900)
2414
2415    Returns:
2416        epoch: Epoch time in milliseconds.
2417    """
2418    if "/" in human_time:
2419        human_time.replace("/", "-")
2420    try:
2421        epoch_start = datetime.utcfromtimestamp(0)
2422        if "." in human_time:
2423            epoch_time = datetime.strptime(human_time, "%Y-%m-%d %H:%M:%S.%f")
2424        else:
2425            epoch_time = datetime.strptime(human_time, "%Y-%m-%d %H:%M:%S")
2426        epoch = int((epoch_time - epoch_start).total_seconds() * 1000)
2427        return epoch
2428    except ValueError:
2429        return None
2430
2431
2432def _get_dpo_info_from_logcat(ad, begin_time):
2433    """Gets the DPO info from logcat.
2434
2435    Args:
2436        ad: The device under test.
2437        begin_time: The start time of the log.
2438    """
2439    dpo_results = ad.search_logcat("HardwareClockDiscontinuityCount",
2440                                   begin_time)
2441    if not dpo_results:
2442        raise signals.TestError(
2443            "No \"HardwareClockDiscontinuityCount\" is found in logs.")
2444    return dpo_results
2445
2446
2447def check_dpo_rate_via_gnss_meas(ad, begin_time, dpo_threshold):
2448    """Check DPO engage rate through "HardwareClockDiscontinuityCount" in
2449    GnssMeasurement callback.
2450
2451    Args:
2452        ad: An AndroidDevice object.
2453        begin_time: test begin time.
2454        dpo_threshold: The value to set threshold. (Ex: dpo_threshold = 60)
2455    """
2456    time_regex = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{3})'
2457    dpo_results = _get_dpo_info_from_logcat(ad, begin_time)
2458    ad.log.info(dpo_results[0]["log_message"])
2459    ad.log.info(dpo_results[-1]["log_message"])
2460    start_time = re.compile(
2461        time_regex).search(dpo_results[0]["log_message"]).group(1)
2462    end_time = re.compile(
2463        time_regex).search(dpo_results[-1]["log_message"]).group(1)
2464    gnss_start_epoch = human_to_epoch_time(start_time)
2465    gnss_stop_epoch = human_to_epoch_time(end_time)
2466    test_time_in_sec = round((gnss_stop_epoch - gnss_start_epoch) / 1000) + 1
2467    first_dpo_count = int(dpo_results[0]["log_message"].split()[-1])
2468    final_dpo_count = int(dpo_results[-1]["log_message"].split()[-1])
2469    dpo_rate = ((final_dpo_count - first_dpo_count)/test_time_in_sec)
2470    dpo_engage_rate = "{percent:.2%}".format(percent=dpo_rate)
2471    ad.log.info("DPO is ON for %d seconds during %d seconds test." % (
2472        final_dpo_count - first_dpo_count, test_time_in_sec))
2473    ad.log.info("TestResult DPO_Engage_Rate " + dpo_engage_rate)
2474    threshold = "{percent:.0%}".format(percent=dpo_threshold / 100)
2475    asserts.assert_true(dpo_rate * 100 > dpo_threshold,
2476                        "DPO only engaged %s in %d seconds test with "
2477                        "threshold %s." % (dpo_engage_rate,
2478                                           test_time_in_sec,
2479                                           threshold))
2480
2481
2482def parse_brcm_nmea_log(ad, nmea_pattern, brcm_error_log_allowlist, stop_logger=True):
2483    """Parse specific NMEA pattern out of BRCM NMEA log.
2484
2485    Args:
2486        ad: An AndroidDevice object.
2487        nmea_pattern: Specific NMEA pattern to parse.
2488        brcm_error_log_allowlist: Benign error logs to exclude.
2489        stop_logger: To stop pixel logger or not.
2490
2491    Returns:
2492        brcm_log_list: A list of specific NMEA pattern logs.
2493    """
2494    brcm_log_list = []
2495    brcm_log_error_pattern = ["lhd: FS: Start Failsafe dump", "E slog"]
2496    brcm_error_log_list = []
2497    pixellogger_path = (
2498        "/sdcard/Android/data/com.android.pixellogger/files/logs/gps/.")
2499    if not isinstance(nmea_pattern, re.Pattern):
2500        nmea_pattern = re.compile(nmea_pattern)
2501
2502    with tempfile.TemporaryDirectory() as tmp_dir:
2503        try:
2504            ad.pull_files(pixellogger_path, tmp_dir)
2505        except AdbCommandError:
2506            raise FileNotFoundError("No pixel logger folders found")
2507
2508        # Although we don't rely on the zip file, stop pixel logger here to avoid
2509        # wasting resources.
2510        if stop_logger:
2511            stop_pixel_logger(ad)
2512
2513        tmp_path = pathlib.Path(tmp_dir)
2514        log_folders = sorted([x for x in tmp_path.iterdir() if x.is_dir()])
2515        if not log_folders:
2516            raise FileNotFoundError("No BRCM logs found.")
2517        # The folder name is a string of datetime, the latest one will be in the last index.
2518        gl_logs = log_folders[-1].glob("**/gl*.log")
2519
2520        for nmea_log_path in gl_logs:
2521            ad.log.info("Parsing log pattern of \"%s\" in %s" % (nmea_pattern,
2522                                                                 nmea_log_path))
2523            with open(nmea_log_path, "r", encoding="UTF-8", errors="ignore") as lines:
2524                for line in lines:
2525                    line = line.strip()
2526                    if nmea_pattern.fullmatch(line):
2527                        brcm_log_list.append(line)
2528                    for attr in brcm_log_error_pattern:
2529                        if attr in line:
2530                            benign_log = False
2531                            for regex_pattern in brcm_error_log_allowlist:
2532                                if re.search(regex_pattern, line):
2533                                    benign_log = True
2534                                    ad.log.debug("\"%s\" is in allow-list and removed "
2535                                                "from error." % line)
2536                            if not benign_log:
2537                                brcm_error_log_list.append(line)
2538
2539    brcm_error_log = "".join(brcm_error_log_list)
2540    return brcm_log_list, brcm_error_log
2541
2542
2543def _get_power_mode_log_from_pixel_logger(ad, brcm_error_log_allowlist, stop_pixel_logger=True):
2544    """Gets the power log from pixel logger.
2545
2546    Args:
2547        ad: The device under test.
2548        brcm_error_log_allow_list: The allow list to ignore certain error in pixel logger.
2549        stop_pixel_logger: To disable pixel logger when getting the log.
2550    """
2551    pglor_list, brcm_error_log = parse_brcm_nmea_log(
2552        ad, _BRCM_DUTY_CYCLE_PATTERN, brcm_error_log_allowlist, stop_pixel_logger)
2553    if not pglor_list:
2554        raise signals.TestFailure("Fail to get DPO logs from pixel logger")
2555
2556    return pglor_list, brcm_error_log
2557
2558
2559def check_dpo_rate_via_brcm_log(ad, dpo_threshold, brcm_error_log_allowlist):
2560    """Check DPO engage rate through "$PGLOR,11,STA" in BRCM Log.
2561    D - Disabled, Always full power.
2562    F - Enabled, now in full power mode.
2563    S - Enabled, now in power save mode.
2564    H - Host off load mode.
2565
2566    Args:
2567        ad: An AndroidDevice object.
2568        dpo_threshold: The value to set threshold. (Ex: dpo_threshold = 60)
2569        brcm_error_log_allowlist: Benign error logs to exclude.
2570    """
2571    always_full_power_count = 0
2572    full_power_count = 0
2573    power_save_count = 0
2574    pglor_list, brcm_error_log = _get_power_mode_log_from_pixel_logger(ad, brcm_error_log_allowlist)
2575
2576    for pglor in pglor_list:
2577        power_res = re.compile(r',P,(\w),').search(pglor).group(1)
2578        if power_res == "D":
2579            always_full_power_count += 1
2580        elif power_res == "F":
2581            full_power_count += 1
2582        elif power_res == "S":
2583            power_save_count += 1
2584    ad.log.info(sorted(pglor_list)[0])
2585    ad.log.info(sorted(pglor_list)[-1])
2586    ad.log.info("TestResult Total_Count %d" % len(pglor_list))
2587    ad.log.info("TestResult Always_Full_Power_Count %d" %
2588                always_full_power_count)
2589    ad.log.info("TestResult Full_Power_Mode_Count %d" % full_power_count)
2590    ad.log.info("TestResult Power_Save_Mode_Count %d" % power_save_count)
2591    dpo_rate = (power_save_count / len(pglor_list))
2592    dpo_engage_rate = "{percent:.2%}".format(percent=dpo_rate)
2593    ad.log.info("Power Save Mode is ON for %d seconds during %d seconds test."
2594                % (power_save_count, len(pglor_list)))
2595    ad.log.info("TestResult DPO_Engage_Rate " + dpo_engage_rate)
2596    threshold = "{percent:.0%}".format(percent=dpo_threshold / 100)
2597    asserts.assert_true((dpo_rate * 100 > dpo_threshold) and not brcm_error_log,
2598                        "Power Save Mode only engaged %s in %d seconds test "
2599                        "with threshold %s.\nAbnormal behavior found as below."
2600                        "\n%s" % (dpo_engage_rate,
2601                                  len(pglor_list),
2602                                  threshold,
2603                                  brcm_error_log))
2604
2605
2606def process_pair(watch, phone):
2607    """Pair phone to watch via Bluetooth in OOBE.
2608
2609    Args:
2610        watch: A wearable project.
2611        phone: A pixel phone.
2612    """
2613    check_location_service(phone)
2614    utils.sync_device_time(phone)
2615    bt_model_name = watch.adb.getprop("ro.product.model")
2616    bt_sn_name = watch.adb.getprop("ro.serialno")
2617    bluetooth_name = bt_model_name +" " + bt_sn_name[10:]
2618    fastboot_factory_reset(watch, False)
2619    # TODO (chenstanley)Need to re-structure for better code and test flow instead of simply waiting
2620    watch.log.info("Wait 1 min for wearable system busy time.")
2621    time.sleep(60)
2622    watch.adb.shell("input keyevent 4")
2623    # Clear Denali paired data in phone.
2624    phone.adb.shell("pm clear com.google.android.gms")
2625    phone.adb.shell("pm clear com.google.android.apps.wear.companion")
2626    phone.adb.shell("am start -S -n com.google.android.apps.wear.companion/"
2627                        "com.google.android.apps.wear.companion.application.RootActivity")
2628    uia_click(phone, "Continue")
2629    uia_click(phone, "More")
2630    uia_click(phone, "I agree")
2631    uia_click(phone, "I accept")
2632    uia_click(phone, bluetooth_name)
2633    uia_click(phone, "Pair")
2634    uia_click(phone, "Skip")
2635    uia_click(phone, "Next")
2636    uia_click(phone, "Skip")
2637    uia_click(phone, "Done")
2638    # TODO (chenstanley)Need to re-structure for better code and test flow instead of simply waiting
2639    watch.log.info("Wait 3 mins for complete pairing process.")
2640    time.sleep(180)
2641    set_screen_always_on(watch)
2642    check_location_service(watch)
2643    enable_gnss_verbose_logging(watch)
2644
2645
2646def is_bluetooth_connected(watch, phone):
2647    """Check if device's Bluetooth status is connected or not.
2648
2649    Args:
2650    watch: A wearable project
2651    phone: A pixel phone.
2652    """
2653    return watch.droid.bluetoothIsDeviceConnected(phone.droid.bluetoothGetLocalAddress())
2654
2655
2656def detect_crash_during_tracking(ad, begin_time, api_type, ignore_hal_crash=False):
2657    """Check if GNSS or GPSTool crash happened druing GNSS Tracking.
2658
2659    Args:
2660    ad: An AndroidDevice object.
2661    begin_time: Start Time to check if crash happened in logs.
2662    api_type: Using GNSS or FLP reading method in GNSS tracking.
2663    ignore_hal_crash: In BRCM devices, once the HAL is being killed, it will write error/fatal logs.
2664      Ignore this error if the error logs are expected.
2665    """
2666    gnss_crash_list = [".*Fatal signal.*gnss",
2667                       ".*Fatal signal.*xtra"]
2668    if not ignore_hal_crash:
2669        gnss_crash_list += [".*Fatal signal.*gpsd", ".*F DEBUG.*gnss"]
2670    if not ad.is_adb_logcat_on:
2671        ad.start_adb_logcat()
2672    for attr in gnss_crash_list:
2673        gnss_crash_result = ad.adb.shell(
2674            "logcat -d | grep -E -i '%s'" % attr, ignore_status=True, timeout = 300)
2675        if gnss_crash_result:
2676            start_gnss_by_gtw_gpstool(ad, state=False, api_type=api_type)
2677            raise signals.TestFailure(
2678                "Test failed due to GNSS HAL crashed. \n%s" %
2679                gnss_crash_result)
2680    gpstool_crash_result = ad.search_logcat("Force finishing activity "
2681                                            "com.android.gpstool/.GPSTool",
2682                                            begin_time)
2683    if gpstool_crash_result:
2684            raise signals.TestError("GPSTool crashed. Abort test.")
2685
2686
2687def is_wearable_btwifi(ad):
2688    """Check device is wearable btwifi sku or not.
2689
2690    Args:
2691        ad: An AndroidDevice object.
2692    """
2693    package = ad.adb.getprop("ro.product.product.name")
2694    ad.log.debug("[ro.product.product.name]: [%s]" % package)
2695    # temp solution. Will check with dev team if there is a command to check.
2696    return "btwifi" in package or ad.model == 'aurora'
2697
2698
2699def compare_watch_phone_location(ad,watch_file, phone_file):
2700    """Compare watch and phone's FLP location to see if the same or not.
2701
2702    Args:
2703        ad: An AndroidDevice object.
2704        watch_file: watch's FLP locations
2705        phone_file: phone's FLP locations
2706    """
2707    not_match_location_counts = 0
2708    not_match_location = []
2709    for watch_key, watch_value in watch_file.items():
2710        if phone_file.get(watch_key):
2711            lat_ads = abs(float(watch_value[0]) - float(phone_file[watch_key][0]))
2712            lon_ads = abs(float(watch_value[1]) - float(phone_file[watch_key][1]))
2713            if lat_ads > 0.000002 or lon_ads > 0.000002:
2714                not_match_location_counts += 1
2715                not_match_location += (watch_key, watch_value, phone_file[watch_key])
2716    if not_match_location_counts > 0:
2717        ad.log.info("There are %s not match locations: %s" %(not_match_location_counts, not_match_location))
2718        ad.log.info("Watch's locations are not using Phone's locations.")
2719        return False
2720    else:
2721        ad.log.info("Watch's locations are using Phone's location.")
2722        return True
2723
2724
2725def check_tracking_file(ad):
2726    """Check tracking file in device and save "Latitude", "Longitude", and "Time" information.
2727
2728    Args:
2729        ad: An AndroidDevice object.
2730
2731    Returns:
2732        location_reports: A dict with [latitude, longitude]
2733    """
2734    location_reports = dict()
2735    test_logfile = {}
2736    file_count = int(ad.adb.shell("find %s -type f -iname *.txt | wc -l"
2737                                  % GNSSSTATUS_LOG_PATH))
2738    if file_count != 1:
2739        ad.log.error("%d API logs exist." % file_count)
2740    dir_file = ad.adb.shell("ls %s" % GNSSSTATUS_LOG_PATH).split()
2741    for path_key in dir_file:
2742        if fnmatch.fnmatch(path_key, "*.txt"):
2743            logpath = posixpath.join(GNSSSTATUS_LOG_PATH, path_key)
2744            out = ad.adb.shell("wc -c %s" % logpath)
2745            file_size = int(out.split(" ")[0])
2746            if file_size < 10:
2747                ad.log.info("Skip log %s due to log size %d bytes" %
2748                            (path_key, file_size))
2749                continue
2750            test_logfile = logpath
2751    if not test_logfile:
2752        raise signals.TestError("Failed to get test log file in device.")
2753    lines = ad.adb.shell("cat %s" % test_logfile).split("\n")
2754    for file_data in lines:
2755        if "Latitude:" in file_data:
2756            file_lat = ("%.6f" %float(file_data[9:]))
2757        elif "Longitude:" in file_data:
2758            file_long = ("%.6f" %float(file_data[11:]))
2759        elif "Time:" in file_data:
2760            file_time = (file_data[17:25])
2761            location_reports[file_time] = [file_lat, file_long]
2762    return location_reports
2763
2764
2765def uia_click(ad, matching_text):
2766    """Use uiautomator to click objects.
2767
2768    Args:
2769        ad: An AndroidDevice object.
2770        matching_text: Text of the target object to click
2771    """
2772    if ad.uia(textMatches=matching_text).wait.exists(timeout=60000):
2773
2774        ad.uia(textMatches=matching_text).click()
2775        ad.log.info("Click button %s" % matching_text)
2776    else:
2777        ad.log.error("No button named %s" % matching_text)
2778
2779
2780def delete_bcm_nvmem_sto_file(ad):
2781    """Delete BCM's NVMEM ephemeris gldata.sto.
2782
2783    Args:
2784        ad: An AndroidDevice object.
2785    """
2786    remount_device(ad)
2787    rm_cmd = "rm -rf {}".format(BCM_NVME_STO_PATH)
2788    status = ad.adb.shell(rm_cmd)
2789    ad.log.info("Delete BCM's NVMEM ephemeris files.\n%s" % status)
2790
2791
2792def bcm_gps_xml_update_option(
2793    ad, child_tag, items_to_update={}, items_to_delete=[], gps_xml_path=BCM_GPS_XML_PATH):
2794    """Updates gps.xml attributes.
2795
2796    The process will go through update first then delete.
2797
2798    Args:
2799        ad: Device under test.
2800        child_tag: (str) Which child node should be updated.
2801        items_to_update: (dict) The attributes to be updated.
2802        items_to_delete: (list) The attributes to be deleted.
2803        gps_xml_path: (str) The gps.xml file path. Default is BCM_GPS_XML_PATH.
2804    """
2805    remount_device(ad)
2806    # to prevent adding nso into xml file
2807    ElementTree.register_namespace("", "http://www.glpals.com/")
2808    with tempfile.TemporaryDirectory() as temp_dir:
2809        local_xml = os.path.join(temp_dir, "gps.xml.ori")
2810        modified_xml = os.path.join(temp_dir, "gps.xml")
2811        ad.pull_files(gps_xml_path, local_xml)
2812        xml_data = ElementTree.parse(local_xml)
2813        root_data = xml_data.getroot()
2814        child_node = None
2815
2816        for node in root_data:
2817            if node.tag.endswith(child_tag):
2818                child_node = node
2819                break
2820
2821        if child_node is None:
2822            raise LookupError(f"Couldn't find node with {child_tag}")
2823
2824        for key, value in items_to_update.items():
2825            child_node.attrib[key] = value
2826
2827        for key in items_to_delete:
2828            if key in child_node.attrib:
2829                child_node.attrib.pop(key)
2830
2831        xml_data.write(modified_xml, xml_declaration=True, encoding="utf-8", method="xml")
2832        ad.push_system_file(modified_xml, gps_xml_path)
2833    ad.log.info("Finish modify gps.xml")
2834
2835def bcm_gps_ignore_warmstandby(ad):
2836    """ remove warmstandby setting in BCM gps.xml to reset tracking filter
2837    Args:
2838        ad: An AndroidDevice object.
2839    """
2840    search_line_tag = 'gll'
2841    delete_line_str = ['WarmStandbyTimeout1Seconds', 'WarmStandbyTimeout2Seconds']
2842    bcm_gps_xml_update_option(ad,
2843                              child_tag=search_line_tag,
2844                              items_to_delete=delete_line_str)
2845
2846def bcm_gps_ignore_rom_alm(ad):
2847    """ Update BCM gps.xml with ignoreRomAlm="True"
2848    Args:
2849        ad: An AndroidDevice object.
2850    """
2851    search_line_tag = 'hal'
2852    append_line_str = {"IgnoreJniTime":"true",
2853                       "AutoColdStartSignal":"SIMULATED"}
2854    bcm_gps_xml_update_option(ad, child_tag=search_line_tag, items_to_update=append_line_str)
2855
2856    search_line_tag = "gll"
2857    append_line_str = {"IgnoreRomAlm":"true"}
2858    bcm_gps_xml_update_option(ad, child_tag=search_line_tag, items_to_update=append_line_str)
2859
2860
2861def check_inject_time(ad):
2862    """Check if watch could get the UTC time.
2863
2864    Args:
2865        ad: An AndroidDevice object.
2866    """
2867    for i in range(1, 6):
2868        time.sleep(10)
2869        inject_time_results = ad.search_logcat("GPSIC.OUT.gps_inject_time")
2870        ad.log.info("Check time injected - attempt %s" % i)
2871        if inject_time_results:
2872            ad.log.info("Time is injected successfully.")
2873            return True
2874    raise signals.TestFailure("Fail to get time injected within %s attempts." % i)
2875
2876def recover_paired_status(watch, phone):
2877    """Recover Bluetooth paired status if not paired.
2878
2879    Args:
2880        watch: A wearable project.
2881        phone: A pixel phone.
2882    """
2883    for _ in range(3):
2884        watch.log.info("Switch Bluetooth Off-On to recover paired status.")
2885        for status in (False, True):
2886            watch.droid.bluetoothToggleState(status)
2887            phone.droid.bluetoothToggleState(status)
2888            # TODO (chenstanley)Need to re-structure for better code and test flow instead of simply waiting
2889            watch.log.info("Wait for Bluetooth auto re-connect.")
2890            time.sleep(10)
2891        if is_bluetooth_connected(watch, phone):
2892            watch.log.info("Success to recover paired status.")
2893            return True
2894    raise signals.TestFailure("Fail to recover BT paired status in 3 attempts.")
2895
2896def push_lhd_overlay(ad):
2897    """Push lhd_overlay.conf to device in /data/vendor/gps/overlay/
2898
2899    ad:
2900        ad: An AndroidDevice object.
2901    """
2902    overlay_name = "lhd_overlay.conf"
2903    overlay_asset = ad.adb.shell("ls /data/vendor/gps/overlay/")
2904    if overlay_name in overlay_asset:
2905        ad.log.info(f"{overlay_name} already in device, skip.")
2906        return
2907
2908    temp_path = tempfile.mkdtemp()
2909    file_path = os.path.join(temp_path, overlay_name)
2910    lhd_content = 'Lhe477xDebugFlags=RPC:FACILITY=2097151:LOG_INFO:STDOUT_PUTS:STDOUT_LOG\n'\
2911                  'LogLevel=*:E\nLogLevel=*:W\nLogLevel=*:I\nLog=LOGCAT\nLogEnabled=true\n'
2912    overlay_path = "/data/vendor/gps/overlay/"
2913    with open(file_path, "w") as f:
2914        f.write(lhd_content)
2915    ad.log.info("Push lhd_overlay to device")
2916    ad.adb.push(file_path, overlay_path)
2917
2918
2919def disable_ramdump(ad):
2920    """Disable ramdump so device will reboot when about to enter ramdump
2921
2922    Once device enter ramdump, it will take a while to generate dump file
2923    The process may take a while and block all the tests.
2924    By disabling the ramdump mode, device will reboot instead of entering ramdump mode
2925
2926    Args:
2927        ad: An AndroidDevice object.
2928    """
2929    ad.log.info("Enter bootloader mode")
2930    ad.stop_services()
2931    ad.adb.reboot("bootloader")
2932    for _ in range(1,9):
2933        if ad.is_bootloader:
2934            break
2935        time.sleep(1)
2936    else:
2937        raise signals.TestFailure("can't enter bootloader mode")
2938    ad.log.info("Disable ramdump")
2939    ad.fastboot.oem("ramdump disable")
2940    ad.fastboot.reboot()
2941    ad.wait_for_boot_completion()
2942    ad.root_adb()
2943    tutils.bring_up_sl4a(ad)
2944    ad.start_adb_logcat()
2945
2946
2947def get_device_time(ad):
2948    """Get current datetime from device
2949
2950    Args:
2951        ad: An AndroidDevice object.
2952
2953    Returns:
2954        datetime object
2955    """
2956    result = ad.adb.shell("date +\"%Y-%m-%d %T.%3N\"")
2957    return datetime.strptime(result, "%Y-%m-%d %H:%M:%S.%f")
2958
2959
2960def ensure_power_manager_is_dozing(ad, begin_time):
2961    """Check if power manager is in dozing
2962    When device is sleeping, power manager should goes to doze mode.
2963    To ensure that, we check the log every 1 second (maximum to 3 times)
2964
2965    Args:
2966        ad: An AndroidDevice object.
2967        begin_time: datetime, used as the starting point to search log
2968    """
2969    keyword = "PowerManagerService: Dozing"
2970    ad.log.debug("Log search start time: %s" % begin_time)
2971    for i in range(0,3):
2972        result = ad.search_logcat(keyword, begin_time)
2973        if result:
2974            break
2975        ad.log.debug("Power manager is not dozing... retry in 1 second")
2976        time.sleep(1)
2977    else:
2978        ad.log.warn("Power manager didn't enter dozing")
2979
2980def enter_deep_doze_mode(ad, lasting_time_in_seconds: int):
2981    """Puts the device into deep doze mode.
2982
2983    Args:
2984        ad: The device under test.
2985        lasting_time_in_seconds: How long does the doze mode last.
2986    """
2987    target_time = datetime.now() + timedelta(seconds=lasting_time_in_seconds)
2988
2989    try:
2990      ad.log.info("Enter deep doze mode for %d seconds" % lasting_time_in_seconds)
2991      device_doze.enter_doze_mode(ad, device_doze.DozeType.DEEP)
2992      while datetime.now() < target_time:
2993        time.sleep(1)
2994    finally:
2995      ad.log.info("Leave deep doze mode")
2996      device_doze.leave_doze_mode(ad, device_doze.DozeType.DEEP)
2997
2998
2999def check_location_report_interval(ad, location_reported_time_src, total_seconds, tolerance):
3000    """Validate the interval between two location reported time
3001    Normally the interval should be around 1 second but occasionally it may up to nearly 2 seconds
3002    So we set up a tolerance - 99% of reported interval should be less than 1.3 seconds
3003
3004    We validate the interval backward, because the wrong interval mostly happened at the end
3005    Args:
3006        ad: An AndroidDevice object.
3007        location_reported_time_src: A list of reported time(in string) from GPS tool
3008        total_seconds: (int) how many seconds has the GPS been enabled
3009        tolerance: (float) set how many ratio of error should be accepted
3010                   if we want to set tolerance to be 1% then pass 0.01 as tolerance value
3011    """
3012    ad.log.info("Checking location report frequency")
3013    error_count = 0
3014    error_tolerance = max(1, int(total_seconds * tolerance))
3015    expected_longest_interval = 1.3
3016    location_reported_time = list(map(lambda x: datetime.strptime(x, "%Y/%m/%d %H:%M:%S.%f"),
3017                                      location_reported_time_src))
3018    location_reported_time = sorted(location_reported_time)
3019    last_gps_report_time = location_reported_time[-1]
3020    ad.log.debug("Location report time: %s" % location_reported_time)
3021
3022    for reported_time in reversed(location_reported_time):
3023        time_diff = last_gps_report_time - reported_time
3024        if time_diff.total_seconds() > expected_longest_interval:
3025            error_count += 1
3026        last_gps_report_time = reported_time
3027
3028    if error_count > error_tolerance:
3029        fail_message = (f"Interval longer than {expected_longest_interval}s "
3030                        f"exceed tolerance count: {error_tolerance}, error count: {error_count}")
3031        ad.log.error(fail_message)
3032
3033
3034@contextmanager
3035def set_screen_status(ad, off=True):
3036    """Set screen on / off
3037
3038    A context manager function, can be used with "with" statement.
3039    example:
3040        with set_screen_status(ad, off=True):
3041            do anything you want during screen is off
3042    Once the function end, it will turn on the screen
3043    Args:
3044        ad: AndroidDevice object
3045        off: (bool) True -> turn off screen / False -> leave screen as it is
3046    """
3047    try:
3048        if off:
3049            ad.droid.goToSleepNow()
3050        yield ad
3051    finally:
3052        ad.droid.wakeUpNow()
3053        ensure_device_screen_is_on(ad)
3054
3055
3056@contextmanager
3057def full_gnss_measurement(ad):
3058    """Context manager function to enable full gnss measurement"""
3059    try:
3060        ad.adb.shell("settings put global development_settings_enabled 1")
3061        ad.adb.shell("settings put global enable_gnss_raw_meas_full_tracking 1")
3062        yield ad
3063    finally:
3064        ad.adb.shell("settings put global enable_gnss_raw_meas_full_tracking 0")
3065
3066
3067def ensure_device_screen_is_on(ad):
3068    """Make sure the screen is on
3069
3070    Will try 3 times, each with 1 second interval
3071
3072    Raise:
3073        GnssTestUtilsError: if screen can't be turn on after 3 tries
3074    """
3075    for _ in range(3):
3076        # when NotificationShade appears in focus window, it indicates the screen is still off
3077        if "NotificationShade" not in check_current_focus_app(ad):
3078            break
3079        time.sleep(1)
3080    else:
3081        raise GnssTestUtilsError("Device screen is not on after 3 tries")
3082
3083
3084def start_qxdm_and_tcpdump_log(ad, enable):
3085    """Start QXDM and adb tcpdump if collect_logs is True.
3086    Args:
3087        ad: AndroidDevice object
3088        enable: (bool) True -> start collecting
3089                       False -> not start collecting
3090    """
3091    if enable:
3092        start_pixel_logger(ad)
3093        tlutils.start_adb_tcpdump(ad)
3094
3095
3096def set_screen_always_on(ad):
3097    """Ensure the sceen will not turn off and display the correct app screen
3098    for wearable, we also disable the charing screen,
3099    otherwise the charing screen will keep popping up and block the GPS tool
3100    """
3101    if is_device_wearable(ad):
3102        ad.adb.shell("settings put global stay_on_while_plugged_in 7")
3103        ad.adb.shell("setprop persist.enable_charging_experience false")
3104    else:
3105        ad.adb.shell("settings put system screen_off_timeout 1800000")
3106
3107
3108def validate_adr_rate(ad, pass_criteria):
3109    """Check the ADR rate
3110
3111    Args:
3112        ad: AndroidDevice object
3113        pass_criteria: (float) the passing ratio, 1 = 100%, 0.5 = 50%
3114    """
3115    adr_statistic = GnssMeasurement(ad).get_adr_static()
3116
3117    ad.log.info("ADR threshold: {0:.1%}".format(pass_criteria))
3118    ad.log.info(UPLOAD_TO_SPONGE_PREFIX + "ADR_valid_rate {0:.1%}".format(adr_statistic.valid_rate))
3119    ad.log.info(UPLOAD_TO_SPONGE_PREFIX +
3120                "ADR_usable_rate {0:.1%}".format(adr_statistic.usable_rate))
3121    ad.log.info(UPLOAD_TO_SPONGE_PREFIX + "ADR_total_count %s" % adr_statistic.total_count)
3122    ad.log.info(UPLOAD_TO_SPONGE_PREFIX + "ADR_valid_count %s" % adr_statistic.valid_count)
3123    ad.log.info(UPLOAD_TO_SPONGE_PREFIX + "ADR_reset_count %s" % adr_statistic.reset_count)
3124    ad.log.info(UPLOAD_TO_SPONGE_PREFIX +
3125                "ADR_cycle_slip_count %s" % adr_statistic.cycle_slip_count)
3126    ad.log.info(UPLOAD_TO_SPONGE_PREFIX +
3127                "ADR_half_cycle_reported_count %s" % adr_statistic.half_cycle_reported_count)
3128    ad.log.info(UPLOAD_TO_SPONGE_PREFIX +
3129                "ADR_half_cycle_resolved_count %s" % adr_statistic.half_cycle_resolved_count)
3130
3131    asserts.assert_true(
3132        (pass_criteria < adr_statistic.valid_rate) and (pass_criteria < adr_statistic.usable_rate),
3133        f"ADR valid rate: {adr_statistic.valid_rate:.1%}, "
3134        f"ADR usable rate: {adr_statistic.usable_rate:.1%} "
3135        f"Lower than expected: {pass_criteria:.1%}"
3136    )
3137
3138
3139def pair_to_wearable(watch, phone):
3140    """Pair watch to phone.
3141
3142    Args:
3143        watch: A wearable project.
3144        phone: A pixel phone.
3145    Raise:
3146        TestFailure: If pairing process could not success after 3 tries.
3147    """
3148    for _ in range(3):
3149        process_pair(watch, phone)
3150        if is_bluetooth_connected(watch, phone):
3151            watch.log.info("Pairing successfully.")
3152            return True
3153    raise signals.TestFailure("Pairing is not successfully.")
3154
3155
3156def disable_battery_defend(ad):
3157    """Disable battery defend config to prevent battery defend message pop up
3158    after connecting to the same charger for 4 days in a row.
3159
3160    Args:
3161        ad: A wearable project.
3162    """
3163    for _ in range(5):
3164        remount_device(ad)
3165        ad.adb.shell("setprop vendor.battery.defender.disable 1")
3166        # To simulate cable unplug and the status will be recover after device reboot.
3167        ad.adb.shell("cmd battery unplug")
3168        # Sleep 3 seconds for waiting adb commend changes config and simulates cable unplug.
3169        time.sleep(3)
3170        config_setting = ad.adb.shell("getprop vendor.battery.defender.state")
3171        if config_setting == "DISABLED":
3172            ad.log.info("Disable Battery Defend setting successfully.")
3173            break
3174
3175
3176def restart_hal_service(ad):
3177    """Restart HAL service by killing the pid.
3178
3179    Gets the pid by ps command and pass the pid to kill command. Then we get the pid of HAL service
3180    again to see if the pid changes(pid should be different after HAL restart). If not, we will
3181    retry up to 4 times before raising Test Failure.
3182
3183    Args:
3184        ad: AndroidDevice object
3185    """
3186    ad.log.info("Restart HAL service")
3187    hal_process_name = "'android.hardware.gnss@[[:digit:]]\{1,2\}\.[[:digit:]]\{1,2\}-service'"
3188    hal_pid = get_process_pid(ad, hal_process_name)
3189    ad.log.info("HAL pid: %s" % hal_pid)
3190
3191    # Retry kill process if the PID is the same as original one
3192    for _ in range(4):
3193        ad.log.info("Kill HAL service")
3194        ad.adb.shell(f"kill -9 {hal_pid}")
3195
3196        # Waits for the HAL service to restart up to 4 seconds.
3197        for _ in range(4):
3198            new_hal_pid = get_process_pid(ad, hal_process_name)
3199            ad.log.info("New HAL pid: %s" % new_hal_pid)
3200            if new_hal_pid:
3201                if hal_pid != new_hal_pid:
3202                    return
3203                break
3204            time.sleep(1)
3205    else:
3206        raise signals.TestFailure("HAL service can't be killed")
3207
3208
3209def run_ttff(ad, mode, criteria, test_cycle, base_lat_long, collect_logs=False):
3210    """Verify TTFF functionality with mobile data.
3211
3212    Args:
3213        mode: "cs", "ws" or "hs"
3214        criteria: Criteria for the test.
3215
3216    Returns:
3217        ttff_data: A dict of all TTFF data.
3218    """
3219    start_qxdm_and_tcpdump_log(ad, collect_logs)
3220    return run_ttff_via_gtw_gpstool(ad, mode, criteria, test_cycle, base_lat_long)
3221
3222
3223def re_register_measurement_callback(dut):
3224    """Send command to unregister then register measurement callback.
3225
3226    Args:
3227        dut: The device under test.
3228    """
3229    dut.log.info("Reregister measurement callback")
3230    dut.adb.shell("am broadcast -a com.android.gpstool.stop_meas_action")
3231    time.sleep(1)
3232    dut.adb.shell("am broadcast -a com.android.gpstool.start_meas_action")
3233    time.sleep(1)
3234
3235
3236def check_power_save_mode_status(ad, full_power, begin_time, brcm_error_allowlist):
3237    """Checks the power save mode status.
3238
3239    For Broadcom:
3240        Gets NEMA sentences from pixel logger and retrieve the status [F, S, D].
3241        F,S => not in full power mode
3242        D => in full power mode
3243    For Qualcomm:
3244        Gets the HardwareClockDiscontinuityCount from logcat. In full power mode, the
3245        HardwareClockDiscontinuityCount should not be increased.
3246
3247    Args:
3248        ad: The device under test.
3249        full_power: The device is in full power mode or not.
3250        begin_time: It is used to get the correct logcat information for qualcomm.
3251        brcm_error_allowlist: It is used to ignore certain error in pixel logger.
3252    """
3253    if check_chipset_vendor_by_qualcomm(ad):
3254        _check_qualcomm_power_save_mode(ad, full_power, begin_time)
3255    else:
3256        _check_broadcom_power_save_mode(ad, full_power, brcm_error_allowlist)
3257
3258
3259def _check_qualcomm_power_save_mode(ad, full_power, begin_time):
3260    dpo_results = _get_dpo_info_from_logcat(ad, begin_time)
3261    first_dpo_count = int(dpo_results[0]["log_message"].split()[-1])
3262    final_dpo_count = int(dpo_results[-1]["log_message"].split()[-1])
3263    dpo_count_diff = final_dpo_count - first_dpo_count
3264    ad.log.debug("The DPO count diff is {diff}".format(diff=dpo_count_diff))
3265    if full_power:
3266        asserts.assert_equal(dpo_count_diff, 0, msg="DPO count diff should be 0")
3267    else:
3268        asserts.assert_true(dpo_count_diff > 0, msg="DPO count diff should be more than 0")
3269
3270
3271def _check_broadcom_power_save_mode(ad, full_power, brcm_error_allowlist):
3272    power_save_log, _ = _get_power_mode_log_from_pixel_logger(
3273        ad, brcm_error_allowlist, stop_pixel_logger=False)
3274    power_status = re.compile(r',P,(\w),').search(power_save_log[-2]).group(1)
3275    ad.log.debug("The power status is {status}".format(status=power_status))
3276    if full_power:
3277        asserts.assert_true(power_status == "D", msg="Should be in full power mode")
3278    else:
3279        asserts.assert_true(power_status in ["F", "S"], msg="Should not be in full power mode")
3280
3281@contextmanager
3282def run_gnss_tracking(ad, criteria, meas_flag):
3283    """A context manager to enable gnss tracking and stops at the end.
3284
3285    Args:
3286        ad: The device under test.
3287        criteria: The criteria for First Fixed.
3288        meas_flag: A flag to turn on measurement log or not.
3289    """
3290    process_gnss_by_gtw_gpstool(ad, criteria=criteria, meas_flag=meas_flag)
3291    try:
3292        yield
3293    finally:
3294        start_gnss_by_gtw_gpstool(ad, state=False)
3295
3296def log_current_epoch_time(ad, sponge_key):
3297    """Logs current epoch timestamp in second.
3298
3299    Args:
3300        sponge_key: The key name of the sponge property.
3301    """
3302    current_epoch_time = get_current_epoch_time() // 1000
3303    ad.log.info(f"TestResult {sponge_key} {current_epoch_time}")
3304
3305
3306def validate_diff_of_gps_clock_elapsed_realtime(ad, start_time):
3307    """Validates the diff of gps clock and elapsed realtime should be stable.
3308
3309    Args:
3310        ad: The device under test.
3311        start_time: When should the validation start. For BRCM devices, the PPS feature takes some
3312            time after first fixed to start working. Therefore we should ignore some data.
3313    """
3314    last_gps_elapsed_realtime_diff = 0
3315    variation_diff = {}
3316
3317    for clock in GnssMeasurement(ad).get_gnss_clock_info():
3318        if clock.event_time < start_time:
3319            continue
3320
3321        if not bool(last_gps_elapsed_realtime_diff):
3322            last_gps_elapsed_realtime_diff = clock.gps_elapsed_realtime_diff
3323            continue
3324
3325        current_gps_elapsed_realtime_diff = clock.gps_elapsed_realtime_diff
3326        variation_diff[clock.event_time] = abs(
3327            current_gps_elapsed_realtime_diff - last_gps_elapsed_realtime_diff)
3328        last_gps_elapsed_realtime_diff = current_gps_elapsed_realtime_diff
3329
3330    over_criteria_data = [
3331        (event_time, diff) for (event_time, diff) in variation_diff.items() if (
3332            diff > _GPS_ELAPSED_REALTIME_DIFF_TOLERANCE)
3333    ]
3334
3335    asserts.assert_true(
3336        [] == over_criteria_data,
3337        msg=f"Following data are over criteria: {over_criteria_data}",
3338    )
3339