1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16import os
17from acts_contrib.test_utils.gnss.GnssBlankingBase import GnssBlankingBase
18from collections import namedtuple
19from acts_contrib.test_utils.gnss.LabTtffTestBase import LabTtffTestBase
20from acts_contrib.test_utils.gnss.gnss_test_utils import detect_crash_during_tracking, gnss_tracking_via_gtw_gpstool, \
21                                    start_gnss_by_gtw_gpstool, process_ttff_by_gtw_gpstool, calculate_position_error
22from acts.context import get_current_context
23from acts.utils import get_current_epoch_time
24from time import sleep
25import csv
26import matplotlib.pyplot as plt
27from mpl_toolkits.mplot3d.axes3d import Axes3D
28import statistics
29
30
31class LabGnssPowerSweepTest(GnssBlankingBase):
32
33    def gnss_plot_2D_result(self, position_error):
34        """Plot 2D position error result
35        """
36        x_axis = []
37        y_axis = []
38        z_axis = []
39        for key in position_error:
40            tmp = key.split('_')
41            l1_pwr = float(tmp[1])
42            l5_pwr = float(tmp[3])
43            position_error_value = position_error[key]
44            x_axis.append(l1_pwr)
45            y_axis.append(l5_pwr)
46            z_axis.append(position_error_value)
47
48        fig = plt.figure(figsize=(12, 7))
49        ax = plt.axes(projection='3d')
50        ax.scatter(x_axis, y_axis, z_axis)
51        plt.title("Z axis Position Error", fontsize=12)
52        plt.xlabel("L1 PWR (dBm)", fontsize=12)
53        plt.ylabel("L5 PWR (dBm)", fontsize=12)
54        plt.show()
55        path_name = os.path.join(self.gnss_log_path, 'result.png')
56        plt.savefig(path_name)
57
58    def gnss_wait_for_ephemeris_download(self):
59        """Launch GTW GPSTool and Clear all GNSS aiding data
60           Start GNSS tracking on GTW_GPSTool.
61           Wait for "first_wait" at simulator power = "power_level" to download Ephemeris
62        """
63        first_wait = self.user_params.get('first_wait', 300)
64        LabTtffTestBase.start_set_gnss_power(self)
65        self.start_gnss_and_wait(first_wait)
66
67    def gnss_check_fix(self, json_tag):
68        """Launch GTW GPSTool and check position fix or not
69          Returns:
70            True : Can fix within 120 sec
71            False
72        """
73        # Check Latitude for fix
74        self.dut.log.info("Restart GTW GPSTool in gnss_check_fix")
75        start_gnss_by_gtw_gpstool(self.dut, state=True)
76        begin_time = get_current_epoch_time()
77        if not self.dut.is_adb_logcat_on:
78            self.dut.start_adb_logcat()
79        while True:
80            if get_current_epoch_time() - begin_time >= 120000:
81                self.dut.log.info("Location fix timeout in gnss_check_fix")
82                start_gnss_by_gtw_gpstool(self.dut, state=False)
83                json_tag = json_tag + '_gnss_check_fix_timeout'
84                self.dut.cat_adb_log(tag=json_tag,
85                                     begin_time=begin_time,
86                                     end_time=None,
87                                     dest_path=self.gnss_log_path)
88                return False
89            sleep(1)
90            logcat_results = self.dut.search_logcat("Latitude", begin_time)
91            if logcat_results:
92                self.dut.log.info("Location fix successfully in gnss_check_fix")
93                json_tag = json_tag + '_gnss_check_fix_success'
94                self.dut.cat_adb_log(tag=json_tag,
95                                     begin_time=begin_time,
96                                     end_time=None,
97                                     dest_path=self.gnss_log_path)
98                return True
99
100    def gnss_check_l5_engaging(self, json_tag):
101        """check L5 engaging
102          Returns:
103            True : L5 engaged
104            False
105        """
106        # Check L5 engaging rate
107        begin_time = get_current_epoch_time()
108        if not self.dut.is_adb_logcat_on:
109            self.dut.start_adb_logcat()
110        while True:
111            if get_current_epoch_time() - begin_time >= 120000:
112                self.dut.log.info(
113                    "L5 engaging timeout in gnss_check_l5_engaging")
114                start_gnss_by_gtw_gpstool(self.dut, state=False)
115                json_tag = json_tag + '_gnss_check_l5_engaging_timeout'
116                self.dut.cat_adb_log(tag=json_tag,
117                                     begin_time=begin_time,
118                                     end_time=None,
119                                     dest_path=self.gnss_log_path)
120                return False
121            sleep(1)
122            logcat_results = self.dut.search_logcat("L5 engaging rate:",
123                                                    begin_time)
124            if logcat_results:
125                start_idx = logcat_results[-1]['log_message'].find(
126                    "L5 engaging rate:")
127                tmp = logcat_results[-1]['log_message'][(start_idx + 18):]
128                l5_engaging_rate = float(tmp.strip('%'))
129
130                if l5_engaging_rate != 0:
131                    self.dut.log.info("L5 engaged")
132                    json_tag = json_tag + '_gnss_check_l5_engaging_success'
133                    self.dut.cat_adb_log(tag=json_tag,
134                                         begin_time=begin_time,
135                                         end_time=None,
136                                         dest_path=self.gnss_log_path)
137                    return True
138
139    def gnss_check_position_error(self, json_tag):
140        """check position error
141          Returns:
142            position error average value
143        """
144        average_position_error_count = 60
145        position_error_all = []
146        hacc_all = []
147        default_position_error_mean = 6666
148        default_position_error_std = 6666
149        default_hacc_mean = 6666
150        default_hacc_std = 6666
151        idx = 0
152        begin_time = get_current_epoch_time()
153        if not self.dut.is_adb_logcat_on:
154            self.dut.start_adb_logcat()
155        while True:
156            if get_current_epoch_time() - begin_time >= 120000:
157                self.dut.log.info(
158                    "Position error calculation timeout in gnss_check_position_error"
159                )
160                start_gnss_by_gtw_gpstool(self.dut, state=False)
161                json_tag = json_tag + '_gnss_check_position_error_timeout'
162                self.dut.cat_adb_log(tag=json_tag,
163                                     begin_time=begin_time,
164                                     end_time=None,
165                                     dest_path=self.gnss_log_path)
166                return default_position_error_mean, default_position_error_std, default_hacc_mean, default_hacc_std
167            sleep(1)
168            gnss_results = self.dut.search_logcat("GPSService: Check item",
169                                                  begin_time)
170            if gnss_results:
171                self.dut.log.info(gnss_results[-1]["log_message"])
172                gnss_location_log = \
173                    gnss_results[-1]["log_message"].split()
174                ttff_lat = float(gnss_location_log[8].split("=")[-1].strip(","))
175                ttff_lon = float(gnss_location_log[9].split("=")[-1].strip(","))
176                loc_time = int(gnss_location_log[10].split("=")[-1].strip(","))
177                ttff_haccu = float(
178                    gnss_location_log[11].split("=")[-1].strip(","))
179                hacc_all.append(ttff_haccu)
180                position_error = calculate_position_error(
181                    ttff_lat, ttff_lon, self.simulator_location)
182                position_error_all.append(abs(position_error))
183                idx = idx + 1
184                if idx >= average_position_error_count:
185                    position_error_mean = statistics.mean(position_error_all)
186                    position_error_std = statistics.stdev(position_error_all)
187                    hacc_mean = statistics.mean(hacc_all)
188                    hacc_std = statistics.stdev(hacc_all)
189                    json_tag = json_tag + '_gnss_check_position_error_success'
190                    self.dut.cat_adb_log(tag=json_tag,
191                                         begin_time=begin_time,
192                                         end_time=None,
193                                         dest_path=self.gnss_log_path)
194                    return position_error_mean, position_error_std, hacc_mean, hacc_std
195
196    def gnss_tracking_L5_position_error_capture(self, json_tag):
197        """Capture position error after L5 engaged
198        Args:
199        Returns:
200            Position error with L5
201        """
202        self.dut.log.info('Start gnss_tracking_L5_position_error_capture')
203        fixed = self.gnss_check_fix(json_tag)
204        if fixed:
205            l5_engaged = self.gnss_check_l5_engaging(json_tag)
206            if l5_engaged:
207                position_error_mean, position_error_std, hacc_mean, hacc_std = self.gnss_check_position_error(
208                    json_tag)
209                start_gnss_by_gtw_gpstool(self.dut, state=False)
210            else:
211                position_error_mean = 8888
212                position_error_std = 8888
213                hacc_mean = 8888
214                hacc_std = 8888
215        else:
216            position_error_mean = 9999
217            position_error_std = 9999
218            hacc_mean = 9999
219            hacc_std = 9999
220            self.position_fix_timeout_cnt = self.position_fix_timeout_cnt + 1
221
222            if self.position_fix_timeout_cnt > (self.l1_sweep_cnt / 2):
223                self.l1_sensitivity_point = self.current_l1_pwr
224
225        return position_error_mean, position_error_std, hacc_mean, hacc_std
226
227    def gnss_power_tracking_loop(self):
228        """Launch GTW GPSTool and Clear all GNSS aiding data
229           Start GNSS tracking on GTW_GPSTool.
230
231        Args:
232
233        Returns:
234            True: First fix TTFF are within criteria.
235            False: First fix TTFF exceed criteria.
236        """
237        test_period = 60
238        type = 'gnss'
239        start_time = get_current_epoch_time()
240        start_gnss_by_gtw_gpstool(self.dut, state=True, type=type)
241        while get_current_epoch_time() - start_time < test_period * 1000:
242            detect_crash_during_tracking(self.dut, start_time, type)
243        stop_time = get_current_epoch_time()
244
245        return start_time, stop_time
246
247    def parse_tracking_log_cat(self, log_dir):
248        self.log.warning(f'Parsing log cat {log_dir} results into dataframe!')
249
250    def check_l5_points(self, gnss_pwr_swp):
251        cnt = 0
252        for kk in range(len(gnss_pwr_swp[1])):
253            if gnss_pwr_swp[1][kk][0] == gnss_pwr_swp[1][kk + 1][0]:
254                cnt = cnt + 1
255            else:
256                return cnt
257
258    def test_tracking_power_sweep(self):
259        # Create log file path
260        full_output_path = get_current_context().get_full_output_path()
261        self.gnss_log_path = os.path.join(full_output_path, '')
262        os.makedirs(self.gnss_log_path, exist_ok=True)
263        self.log.debug(f'Create log path: {self.gnss_log_path}')
264        csv_path = self.gnss_log_path + 'L1_L5_2D_search_result.csv'
265        csvfile = open(csv_path, 'w')
266        writer = csv.writer(csvfile)
267        writer.writerow([
268            "csv_result_tag", "position_error_mean", "position_error_std",
269            "hacc_mean", "hacc_std"
270        ])
271        # for L1 position fix early termination
272        self.l1_sensitivity_point = -999
273        self.enable_early_terminate = 1
274        self.position_fix_timeout_cnt = 0
275        self.current_l1_pwr = 0
276        self.l1_sweep_cnt = 0
277
278        self.gnss_wait_for_ephemeris_download()
279        l1_cable_loss = self.gnss_sim_params.get('L1_cable_loss')
280        l5_cable_loss = self.gnss_sim_params.get('L5_cable_loss')
281
282        for i, gnss_pwr_swp in enumerate(self.gnss_pwr_sweep_fine_sweep_ls):
283            self.log.info(f'Start fine GNSS power level sweep part {i + 1}')
284            self.l1_sweep_cnt = self.check_l5_points(gnss_pwr_swp)
285            for gnss_pwr_params in gnss_pwr_swp[1]:
286                json_tag = f'test_'
287                csv_result_tag = ''
288                for ii, pwr in enumerate(
289                        gnss_pwr_params):  # Setup L1 and L5 power
290                    sat_sys = gnss_pwr_swp[0][ii].get('sat').upper()
291                    band = gnss_pwr_swp[0][ii].get('band').upper()
292                    if band == "L1":
293                        pwr_biased = pwr + l1_cable_loss
294                        if pwr != self.current_l1_pwr:
295                            self.position_fix_timeout_cnt = 0
296                            self.current_l1_pwr = pwr
297                    elif band == "L5":
298                        pwr_biased = pwr + l5_cable_loss
299                    else:
300                        pwr_biased = pwr
301                    # Set GNSS Simulator power level
302                    self.gnss_simulator.ping_inst()
303                    self.gnss_simulator.set_scenario_power(
304                        power_level=pwr_biased,
305                        sat_system=sat_sys,
306                        freq_band=band)
307                    self.log.info(f'Set {sat_sys} {band} with power {pwr}')
308                    json_tag = json_tag + f'{sat_sys}_{band}_{pwr}'
309                    csv_result_tag = csv_result_tag + f'{band}_{pwr}_'
310
311                if self.current_l1_pwr < self.l1_sensitivity_point and self.enable_early_terminate == 1:
312                    position_error_mean = -1
313                    position_error_std = -1
314                    hacc_mean = -1
315                    hacc_std = -1
316                else:
317                    position_error_mean, position_error_std, hacc_mean, hacc_std = self.gnss_tracking_L5_position_error_capture(
318                        json_tag)
319                writer = csv.writer(csvfile)
320                writer.writerow([
321                    csv_result_tag, position_error_mean, position_error_std,
322                    hacc_mean, hacc_std
323                ])
324        csvfile.close()
325