1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16import os 17from acts_contrib.test_utils.gnss.GnssBlankingBase import GnssBlankingBase 18from collections import namedtuple 19from acts_contrib.test_utils.gnss.LabTtffTestBase import LabTtffTestBase 20from acts_contrib.test_utils.gnss.gnss_test_utils import detect_crash_during_tracking, gnss_tracking_via_gtw_gpstool, \ 21 start_gnss_by_gtw_gpstool, process_ttff_by_gtw_gpstool, calculate_position_error 22from acts.context import get_current_context 23from acts.utils import get_current_epoch_time 24from time import sleep 25import csv 26import matplotlib.pyplot as plt 27from mpl_toolkits.mplot3d.axes3d import Axes3D 28import statistics 29 30 31class LabGnssPowerSweepTest(GnssBlankingBase): 32 33 def gnss_plot_2D_result(self, position_error): 34 """Plot 2D position error result 35 """ 36 x_axis = [] 37 y_axis = [] 38 z_axis = [] 39 for key in position_error: 40 tmp = key.split('_') 41 l1_pwr = float(tmp[1]) 42 l5_pwr = float(tmp[3]) 43 position_error_value = position_error[key] 44 x_axis.append(l1_pwr) 45 y_axis.append(l5_pwr) 46 z_axis.append(position_error_value) 47 48 fig = plt.figure(figsize=(12, 7)) 49 ax = plt.axes(projection='3d') 50 ax.scatter(x_axis, y_axis, z_axis) 51 plt.title("Z axis Position Error", fontsize=12) 52 plt.xlabel("L1 PWR (dBm)", fontsize=12) 53 plt.ylabel("L5 PWR (dBm)", fontsize=12) 54 plt.show() 55 path_name = os.path.join(self.gnss_log_path, 'result.png') 56 plt.savefig(path_name) 57 58 def gnss_wait_for_ephemeris_download(self): 59 """Launch GTW GPSTool and Clear all GNSS aiding data 60 Start GNSS tracking on GTW_GPSTool. 61 Wait for "first_wait" at simulator power = "power_level" to download Ephemeris 62 """ 63 first_wait = self.user_params.get('first_wait', 300) 64 LabTtffTestBase.start_set_gnss_power(self) 65 self.start_gnss_and_wait(first_wait) 66 67 def gnss_check_fix(self, json_tag): 68 """Launch GTW GPSTool and check position fix or not 69 Returns: 70 True : Can fix within 120 sec 71 False 72 """ 73 # Check Latitude for fix 74 self.dut.log.info("Restart GTW GPSTool in gnss_check_fix") 75 start_gnss_by_gtw_gpstool(self.dut, state=True) 76 begin_time = get_current_epoch_time() 77 if not self.dut.is_adb_logcat_on: 78 self.dut.start_adb_logcat() 79 while True: 80 if get_current_epoch_time() - begin_time >= 120000: 81 self.dut.log.info("Location fix timeout in gnss_check_fix") 82 start_gnss_by_gtw_gpstool(self.dut, state=False) 83 json_tag = json_tag + '_gnss_check_fix_timeout' 84 self.dut.cat_adb_log(tag=json_tag, 85 begin_time=begin_time, 86 end_time=None, 87 dest_path=self.gnss_log_path) 88 return False 89 sleep(1) 90 logcat_results = self.dut.search_logcat("Latitude", begin_time) 91 if logcat_results: 92 self.dut.log.info("Location fix successfully in gnss_check_fix") 93 json_tag = json_tag + '_gnss_check_fix_success' 94 self.dut.cat_adb_log(tag=json_tag, 95 begin_time=begin_time, 96 end_time=None, 97 dest_path=self.gnss_log_path) 98 return True 99 100 def gnss_check_l5_engaging(self, json_tag): 101 """check L5 engaging 102 Returns: 103 True : L5 engaged 104 False 105 """ 106 # Check L5 engaging rate 107 begin_time = get_current_epoch_time() 108 if not self.dut.is_adb_logcat_on: 109 self.dut.start_adb_logcat() 110 while True: 111 if get_current_epoch_time() - begin_time >= 120000: 112 self.dut.log.info( 113 "L5 engaging timeout in gnss_check_l5_engaging") 114 start_gnss_by_gtw_gpstool(self.dut, state=False) 115 json_tag = json_tag + '_gnss_check_l5_engaging_timeout' 116 self.dut.cat_adb_log(tag=json_tag, 117 begin_time=begin_time, 118 end_time=None, 119 dest_path=self.gnss_log_path) 120 return False 121 sleep(1) 122 logcat_results = self.dut.search_logcat("L5 engaging rate:", 123 begin_time) 124 if logcat_results: 125 start_idx = logcat_results[-1]['log_message'].find( 126 "L5 engaging rate:") 127 tmp = logcat_results[-1]['log_message'][(start_idx + 18):] 128 l5_engaging_rate = float(tmp.strip('%')) 129 130 if l5_engaging_rate != 0: 131 self.dut.log.info("L5 engaged") 132 json_tag = json_tag + '_gnss_check_l5_engaging_success' 133 self.dut.cat_adb_log(tag=json_tag, 134 begin_time=begin_time, 135 end_time=None, 136 dest_path=self.gnss_log_path) 137 return True 138 139 def gnss_check_position_error(self, json_tag): 140 """check position error 141 Returns: 142 position error average value 143 """ 144 average_position_error_count = 60 145 position_error_all = [] 146 hacc_all = [] 147 default_position_error_mean = 6666 148 default_position_error_std = 6666 149 default_hacc_mean = 6666 150 default_hacc_std = 6666 151 idx = 0 152 begin_time = get_current_epoch_time() 153 if not self.dut.is_adb_logcat_on: 154 self.dut.start_adb_logcat() 155 while True: 156 if get_current_epoch_time() - begin_time >= 120000: 157 self.dut.log.info( 158 "Position error calculation timeout in gnss_check_position_error" 159 ) 160 start_gnss_by_gtw_gpstool(self.dut, state=False) 161 json_tag = json_tag + '_gnss_check_position_error_timeout' 162 self.dut.cat_adb_log(tag=json_tag, 163 begin_time=begin_time, 164 end_time=None, 165 dest_path=self.gnss_log_path) 166 return default_position_error_mean, default_position_error_std, default_hacc_mean, default_hacc_std 167 sleep(1) 168 gnss_results = self.dut.search_logcat("GPSService: Check item", 169 begin_time) 170 if gnss_results: 171 self.dut.log.info(gnss_results[-1]["log_message"]) 172 gnss_location_log = \ 173 gnss_results[-1]["log_message"].split() 174 ttff_lat = float(gnss_location_log[8].split("=")[-1].strip(",")) 175 ttff_lon = float(gnss_location_log[9].split("=")[-1].strip(",")) 176 loc_time = int(gnss_location_log[10].split("=")[-1].strip(",")) 177 ttff_haccu = float( 178 gnss_location_log[11].split("=")[-1].strip(",")) 179 hacc_all.append(ttff_haccu) 180 position_error = calculate_position_error( 181 ttff_lat, ttff_lon, self.simulator_location) 182 position_error_all.append(abs(position_error)) 183 idx = idx + 1 184 if idx >= average_position_error_count: 185 position_error_mean = statistics.mean(position_error_all) 186 position_error_std = statistics.stdev(position_error_all) 187 hacc_mean = statistics.mean(hacc_all) 188 hacc_std = statistics.stdev(hacc_all) 189 json_tag = json_tag + '_gnss_check_position_error_success' 190 self.dut.cat_adb_log(tag=json_tag, 191 begin_time=begin_time, 192 end_time=None, 193 dest_path=self.gnss_log_path) 194 return position_error_mean, position_error_std, hacc_mean, hacc_std 195 196 def gnss_tracking_L5_position_error_capture(self, json_tag): 197 """Capture position error after L5 engaged 198 Args: 199 Returns: 200 Position error with L5 201 """ 202 self.dut.log.info('Start gnss_tracking_L5_position_error_capture') 203 fixed = self.gnss_check_fix(json_tag) 204 if fixed: 205 l5_engaged = self.gnss_check_l5_engaging(json_tag) 206 if l5_engaged: 207 position_error_mean, position_error_std, hacc_mean, hacc_std = self.gnss_check_position_error( 208 json_tag) 209 start_gnss_by_gtw_gpstool(self.dut, state=False) 210 else: 211 position_error_mean = 8888 212 position_error_std = 8888 213 hacc_mean = 8888 214 hacc_std = 8888 215 else: 216 position_error_mean = 9999 217 position_error_std = 9999 218 hacc_mean = 9999 219 hacc_std = 9999 220 self.position_fix_timeout_cnt = self.position_fix_timeout_cnt + 1 221 222 if self.position_fix_timeout_cnt > (self.l1_sweep_cnt / 2): 223 self.l1_sensitivity_point = self.current_l1_pwr 224 225 return position_error_mean, position_error_std, hacc_mean, hacc_std 226 227 def gnss_power_tracking_loop(self): 228 """Launch GTW GPSTool and Clear all GNSS aiding data 229 Start GNSS tracking on GTW_GPSTool. 230 231 Args: 232 233 Returns: 234 True: First fix TTFF are within criteria. 235 False: First fix TTFF exceed criteria. 236 """ 237 test_period = 60 238 type = 'gnss' 239 start_time = get_current_epoch_time() 240 start_gnss_by_gtw_gpstool(self.dut, state=True, type=type) 241 while get_current_epoch_time() - start_time < test_period * 1000: 242 detect_crash_during_tracking(self.dut, start_time, type) 243 stop_time = get_current_epoch_time() 244 245 return start_time, stop_time 246 247 def parse_tracking_log_cat(self, log_dir): 248 self.log.warning(f'Parsing log cat {log_dir} results into dataframe!') 249 250 def check_l5_points(self, gnss_pwr_swp): 251 cnt = 0 252 for kk in range(len(gnss_pwr_swp[1])): 253 if gnss_pwr_swp[1][kk][0] == gnss_pwr_swp[1][kk + 1][0]: 254 cnt = cnt + 1 255 else: 256 return cnt 257 258 def test_tracking_power_sweep(self): 259 # Create log file path 260 full_output_path = get_current_context().get_full_output_path() 261 self.gnss_log_path = os.path.join(full_output_path, '') 262 os.makedirs(self.gnss_log_path, exist_ok=True) 263 self.log.debug(f'Create log path: {self.gnss_log_path}') 264 csv_path = self.gnss_log_path + 'L1_L5_2D_search_result.csv' 265 csvfile = open(csv_path, 'w') 266 writer = csv.writer(csvfile) 267 writer.writerow([ 268 "csv_result_tag", "position_error_mean", "position_error_std", 269 "hacc_mean", "hacc_std" 270 ]) 271 # for L1 position fix early termination 272 self.l1_sensitivity_point = -999 273 self.enable_early_terminate = 1 274 self.position_fix_timeout_cnt = 0 275 self.current_l1_pwr = 0 276 self.l1_sweep_cnt = 0 277 278 self.gnss_wait_for_ephemeris_download() 279 l1_cable_loss = self.gnss_sim_params.get('L1_cable_loss') 280 l5_cable_loss = self.gnss_sim_params.get('L5_cable_loss') 281 282 for i, gnss_pwr_swp in enumerate(self.gnss_pwr_sweep_fine_sweep_ls): 283 self.log.info(f'Start fine GNSS power level sweep part {i + 1}') 284 self.l1_sweep_cnt = self.check_l5_points(gnss_pwr_swp) 285 for gnss_pwr_params in gnss_pwr_swp[1]: 286 json_tag = f'test_' 287 csv_result_tag = '' 288 for ii, pwr in enumerate( 289 gnss_pwr_params): # Setup L1 and L5 power 290 sat_sys = gnss_pwr_swp[0][ii].get('sat').upper() 291 band = gnss_pwr_swp[0][ii].get('band').upper() 292 if band == "L1": 293 pwr_biased = pwr + l1_cable_loss 294 if pwr != self.current_l1_pwr: 295 self.position_fix_timeout_cnt = 0 296 self.current_l1_pwr = pwr 297 elif band == "L5": 298 pwr_biased = pwr + l5_cable_loss 299 else: 300 pwr_biased = pwr 301 # Set GNSS Simulator power level 302 self.gnss_simulator.ping_inst() 303 self.gnss_simulator.set_scenario_power( 304 power_level=pwr_biased, 305 sat_system=sat_sys, 306 freq_band=band) 307 self.log.info(f'Set {sat_sys} {band} with power {pwr}') 308 json_tag = json_tag + f'{sat_sys}_{band}_{pwr}' 309 csv_result_tag = csv_result_tag + f'{band}_{pwr}_' 310 311 if self.current_l1_pwr < self.l1_sensitivity_point and self.enable_early_terminate == 1: 312 position_error_mean = -1 313 position_error_std = -1 314 hacc_mean = -1 315 hacc_std = -1 316 else: 317 position_error_mean, position_error_std, hacc_mean, hacc_std = self.gnss_tracking_L5_position_error_capture( 318 json_tag) 319 writer = csv.writer(csvfile) 320 writer.writerow([ 321 csv_result_tag, position_error_mean, position_error_std, 322 hacc_mean, hacc_std 323 ]) 324 csvfile.close() 325