1#!/usr/bin/env python3
2#
3# Copyright (C) 2021 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16"""
17Python module for Spirent GSS7000 GNSS simulator.
18@author: Clay Liao (jianhsiungliao@)
19"""
20from time import sleep
21import xml.etree.ElementTree as ET
22from acts.controllers import abstract_inst
23
24
25def get_xml_text(xml_string='', tag=''):
26    """Parse xml from string and return specific tag
27
28        Args:
29            xml_string: xml string,
30                Type, Str.
31            tag: tag in xml,
32                Type, Str.
33
34        Returns:
35            text: Text content in the tag
36                Type, Str.
37        """
38    if xml_string and tag:
39        root = ET.fromstring(xml_string)
40        try:
41            text = str(root.find(tag).text).rstrip().lstrip()
42        except ValueError:
43            text = 'INVALID DATA'
44    else:
45        text = 'INVALID DATA'
46    return text
47
48
49class GSS7000Error(abstract_inst.SocketInstrumentError):
50    """GSS7000 Instrument Error Class."""
51
52
53class AbstractInstGss7000(abstract_inst.SocketInstrument):
54    """Abstract instrument for  GSS7000"""
55
56    def _query(self, cmd):
57        """query instrument via Socket.
58
59        Args:
60            cmd: Command to send,
61                Type, Str.
62
63        Returns:
64            resp: Response from Instrument via Socket,
65                Type, Str.
66        """
67        self._send(cmd)
68        self._wait()
69        resp = self._recv()
70        return resp
71
72    def _wait(self, wait_time=1):
73        """wait function
74        Args:
75            wait_time: wait time in sec.
76                Type, int,
77                Default, 1.
78        """
79        sleep(wait_time)
80
81
82class GSS7000Ctrl(AbstractInstGss7000):
83    """GSS7000 control daemon class"""
84
85    def __init__(self, ip_addr, ip_port=7717):
86        """Init method for GSS7000 Control Daemon.
87
88        Args:
89            ip_addr: IP Address.
90                Type, str.
91            ip_port: TCPIP Port.
92                Type, str.
93        """
94        super().__init__(ip_addr, ip_port)
95        self.idn = 'Spirent-GSS7000 Control Daemon'
96
97    def connect(self):
98        """Init and Connect to GSS7000 Control Daemon."""
99        # Connect socket then connect socket again
100        self._close_socket()
101        self._connect_socket()
102        # Stop GSS7000 Control Daeamon Then Start
103        self._query('STOP_ENGINE')
104        self._wait()
105        self._query('START_ENGINE')
106
107    def close(self):
108        """Close GSS7000 control daemon"""
109        self._close_socket()
110        self._logger.debug('Closed connection to GSS7000 control daemon')
111
112
113class GSS7000(AbstractInstGss7000):
114    """GSS7000 Class, inherted from abstract_inst SocketInstrument."""
115
116    def __init__(self, ip_addr, engine_ip_port=15650, ctrl_ip_port=7717):
117        """Init method for GSS7000.
118
119        Args:
120            ip_addr: IP Address.
121                Type, str.
122            engine_ip_port: TCPIP Port for
123                Type, str.
124            ctrl_ip_port: TCPIP Port for Control Daemon
125        """
126        super().__init__(ip_addr, engine_ip_port)
127        self.idn = ''
128        self.connected = False
129        self.capability = []
130        self.gss7000_ctrl_daemon = GSS7000Ctrl(ip_addr, ctrl_ip_port)
131        # Close control daemon and engine sockets at the beginning
132        self.gss7000_ctrl_daemon._close_socket()
133        self._close_socket()
134
135    def connect(self):
136        """Connect GSS7000 engine daemon"""
137        # Connect control daemon socket
138        self._logger.debug('Connect to GSS7000')
139        self.gss7000_ctrl_daemon.connect()
140        # Connect to remote engine socket
141        self._wait()
142        self._connect_socket()
143        self.connected = True
144        self.get_hw_capability()
145
146    def close(self):
147        """Close GSS7000 engine daemon"""
148        # Close GSS7000 control daemon
149        self.gss7000_ctrl_daemon.close()
150        # Close GSS7000 engine daemon
151        self._close_socket()
152        self._logger.debug('Closed connection to GSS7000 engine daemon')
153
154    def _parse_hw_cap(self, xml):
155        """Parse GSS7000 hardware capability xml to list.
156            Args:
157                xml: hardware capability xml,
158                    Type, str.
159
160            Returns:
161                capability: Hardware capability dictionary
162                    Type, list.
163        """
164        root = ET.fromstring(xml)
165        capability_ls = []
166        sig_cap_list = root.find('data').find('Signal_capabilities').findall(
167            'Signal')
168        for signal in sig_cap_list:
169            value = str(signal.text).rstrip().lstrip()
170            capability_ls.extend(value.upper().split(' '))
171        return capability_ls
172
173    def get_hw_capability(self):
174        """Check GSS7000 hardware capability
175
176            Returns:
177                capability: Hardware capability dictionary,
178                    Type, list.
179        """
180        if self.connected:
181            capability_xml = self._query('GET_LICENCED_HARDWARE_CAPABILITY')
182            self.capability = self._parse_hw_cap(capability_xml)
183
184        return self.capability
185
186    def get_idn(self):
187        """Get the SimREPLAYplus Version
188
189        Returns:
190            SimREPLAYplus Version
191        """
192        idn_xml = self._query('*IDN?')
193        self.idn = get_xml_text(idn_xml, 'data')
194        return self.idn
195
196    def load_scenario(self, scenario=''):
197        """Load the scenario.
198
199        Args:
200            scenario: path of scenario,
201                Type, str
202        """
203        if scenario == '':
204            errmsg = ('Missing scenario file')
205            raise GSS7000Error(error=errmsg, command='load_scenario')
206        self._logger.debug('Stopped the original scenario')
207        self._query('-,EN,1')
208        cmd = 'SC,' + scenario
209        self._logger.debug('Loading scenario')
210        self._query(cmd)
211        self._logger.debug('Scenario is loaded')
212        return True
213
214    def start_scenario(self, scenario=''):
215        """Load and Start the running scenario.
216
217        Args:
218            scenario: path of scenario,
219                Type, str
220        """
221        if scenario:
222            if self.load_scenario(scenario):
223                self._query('RU')
224        # TODO: Need to refactor the logic design to solve the comment in ag/19222896
225        # Track the issue in b/241200605
226            else:
227                infmsg = 'No scenario is loaded. Stop running scenario'
228                self._logger.debug(infmsg)
229        else:
230            pass
231
232        if scenario:
233            infmsg = f'Started running scenario {scenario}'
234        else:
235            infmsg = 'Started running current scenario'
236
237        self._logger.debug(infmsg)
238
239    def get_scenario_name(self):
240        """Get current scenario name"""
241        sc_name_xml = self._query('SC_NAME')
242        return get_xml_text(sc_name_xml, 'data')
243
244    def stop_scenario(self):
245        """Stop the running scenario."""
246        self._query('-,EN,1')
247        self._logger.debug('Stopped running scenario')
248
249    def set_power_offset(self, ant=1, power_offset=0):
250        """Set Power Offset of GSS7000 Tx
251        Args:
252            ant: antenna number of GSS7000
253            power_offset: transmit power offset level
254                Type, float.
255                Decimal, unit [dB]
256
257        Raises:
258            GSS7000Error: raise when power offset level is not in [-49, 15] range.
259        """
260        if not -49 <= power_offset <= 15:
261            errmsg = (f'"power_offset" must be within [-49, 15], '
262                      f'current input is {power_offset}')
263            raise GSS7000Error(error=errmsg, command='set_power_offset')
264
265        cmd = f'-,POW_LEV,V1_A{ant},{power_offset},GPS,0,0,1,1,1,1,0'
266        self._query(cmd)
267
268        infmsg = f'Set veichel 1 antenna {ant} power offset: {power_offset}'
269        self._logger.debug(infmsg)
270
271    def set_ref_power(self, ref_dBm=-130):
272        """Set Ref Power of GSS7000 Tx
273        Args:
274            ref_dBm: transmit reference power level in dBm for GSS7000
275                Type, float.
276                Decimal, unit [dBm]
277
278        Raises:
279            GSS7000Error: raise when power offset level is not in [-170, -115] range.
280        """
281        if not -170 <= ref_dBm <= -115:
282            errmsg = (f'"power_offset" must be within [-170, -115], '
283                      f'current input is {ref_dBm}')
284            raise GSS7000Error(error=errmsg, command='set_ref_power')
285        cmd = f'REF_DBM,{ref_dBm:.1f}'
286        self._query(cmd)
287        infmsg = f'Set reference power level: {ref_dBm:.1f}'
288        self._logger.debug(infmsg)
289
290    def get_status(self, return_txt=False):
291        """Get current GSS7000 Status
292        Args:
293            return_txt: booling for determining the return results
294                Type, booling.
295        """
296        status_xml = self._query('NULL')
297        status = get_xml_text(status_xml, 'status')
298        if return_txt:
299            status_dict = {
300                '0': 'No Scenario loaded',
301                '1': 'Not completed loading a scenario',
302                '2': 'Idle, ready to run a scenario',
303                '3': 'Arming the scenario',
304                '4': 'Completed arming; or waiting for a command or'
305                     'trigger signal to start the scenario',
306                '5': 'Scenario running',
307                '6': 'Current scenario is paused.',
308                '7': 'Active scenario has stopped and has not been reset.'
309                     'Waiting for further commands.'
310            }
311            return status_dict.get(status)
312        return int(status)
313
314    def set_power(self, power_level=-130):
315        """Set Power Level of GSS7000 Tx
316        Args:
317            power_level: transmit power level
318                Type, float.
319                Decimal, unit [dBm]
320
321        Raises:
322            GSS7000Error: raise when power level is not in [-170, -115] range.
323        """
324        if not -170 <= power_level <= -115:
325            errmsg = (f'"power_level" must be within [-170, -115], '
326                      f'current input is {power_level}')
327            raise GSS7000Error(error=errmsg, command='set_power')
328
329        power_offset = power_level + 130
330        self.set_power_offset(1, power_offset)
331        self.set_power_offset(2, power_offset)
332
333        infmsg = f'Set GSS7000 transmit power to "{power_level:.1f}"'
334        self._logger.debug(infmsg)
335
336    def power_lev_offset_cal(self, power_level=-130, sat='GPS', band='L1'):
337        """Convert target power level to power offset for GSS7000 power setting
338        Args:
339            power_level: transmit power level
340                Type, float.
341                Decimal, unit [dBm]
342                Default. -130
343            sat_system: to set power level for all Satellites
344                Type, str
345                Option 'GPS/GLO/GAL'
346                Type, str
347            freq_band: Frequency band to set the power level
348                Type, str
349                Option 'L1/L5/B1I/B1C/B2A/E5'
350                Default, '', assumed to be L1.
351        Return:
352            power_offset: The calculated power offset for setting GSS7000 GNSS target power.
353        """
354        gss7000_tx_pwr = {
355            'GPS_L1': -130,
356            'GPS_L5': -127.9,
357            'GLONASS_F1': -131,
358            'GALILEO_L1': -127,
359            'GALILEO_E5': -122,
360            'BEIDOU_B1I': -133,
361            'BEIDOU_B1C': -130,
362            'BEIDOU_B2A': -127,
363            'QZSS_L1': -128.5,
364            'QZSS_L5': -124.9,
365            'IRNSS_L5': -130
366        }
367
368        sat_band = f'{sat}_{band}'
369        infmsg = f'Target satellite system and band: {sat_band}'
370        self._logger.debug(infmsg)
371        default_pwr_lev = gss7000_tx_pwr.get(sat_band, -130)
372        power_offset = power_level - default_pwr_lev
373        infmsg = (
374            f'Targer power: {power_level}; Default power: {default_pwr_lev};'
375            f' Power offset: {power_offset}')
376        self._logger.debug(infmsg)
377
378        return power_offset
379
380    def sat_band_convert(self, sat, band):
381        """Satellite system and operation band conversion and check.
382        Args:
383            sat: to set power level for all Satellites
384                Type, str
385                Option 'GPS/GLO/GAL/BDS'
386                Type, str
387            band: Frequency band to set the power level
388                Type, str
389                Option 'L1/L5/B1I/B1C/B2A/F1/E5'
390                Default, '', assumed to be L1.
391        """
392        sat_system_dict = {
393            'GPS': 'GPS',
394            'GLO': 'GLONASS',
395            'GAL': 'GALILEO',
396            'BDS': 'BEIDOU',
397            'IRNSS': 'IRNSS',
398            'ALL': 'GPS'
399        }
400        sat = sat_system_dict.get(sat, 'GPS')
401        if band == '':
402            infmsg = 'No band is set. Set to default band = L1'
403            self._logger.debug(infmsg)
404            band = 'L1'
405        if sat == '':
406            infmsg = 'No satellite system is set. Set to default sat = GPS'
407            self._logger.debug(infmsg)
408            sat = 'GPS'
409        sat_band = f'{sat}_{band}'
410        self._logger.debug(f'Current band: {sat_band}')
411        self._logger.debug(f'Capability: {self.capability}')
412        # Check if satellite standard and band are supported
413        # If not in support list, return GPS_L1 as default
414        if not sat_band in self.capability:
415            errmsg = (
416                f'Satellite system and band ({sat_band}) are not supported.'
417                f'The GSS7000 support list: {self.capability}')
418            raise GSS7000Error(error=errmsg, command='set_scenario_power')
419
420        sat_band_tp = tuple(sat_band.split('_'))
421
422        return sat_band_tp
423
424    def set_scenario_power(self,
425                           power_level=-130,
426                           sat_id='',
427                           sat_system='',
428                           freq_band='L1'):
429        """Set dynamic power for the running scenario.
430        Args:
431            power_level: transmit power level
432                Type, float.
433                Decimal, unit [dBm]
434                Default. -130
435            sat_id: set power level for specific satellite identifiers
436                Type, int.
437            sat_system: to set power level for specific system
438                Type, str
439                Option 'GPS/GLO/GAL/BDS'
440                Type, str
441                Default, '', assumed to be GPS.
442            freq_band: Frequency band to set the power level
443                Type, str
444                Option 'L1/L5/B1I/B1C/B2A/F1/E5'
445                Default, '', assumed to be L1.
446        Raises:
447            GSS7000Error: raise when power offset is not in [-49, -15] range.
448        """
449        band_dict = {
450            'L1': 1,
451            'L5': 2,
452            'B2A': 2,
453            'B1I': 1,
454            'B1C': 1,
455            'F1': 1,
456            'E5': 2
457        }
458
459        # Convert and check satellite system and band
460        sat, band = self.sat_band_convert(sat_system, freq_band)
461        # Get freq band setting
462        band_cmd = band_dict.get(band, 1)
463
464        # When set sat_id --> control specific SV power.
465        # When set is not set --> control all SVs of specific system power.
466        if not sat_id:
467            sat_id = 1
468            all_tx_type = 1
469        else:
470            all_tx_type = 0
471        # Convert absolute power level to absolute power offset.
472        power_offset = self.power_lev_offset_cal(power_level, sat, band)
473
474        if not -49 <= power_offset <= 15:
475            errmsg = (f'"power_offset" must be within [-49, 15], '
476                      f'current input is {power_offset}')
477            raise GSS7000Error(error=errmsg, command='set_power_offset')
478
479        # If no specific sat_system is set, the default is GPS L1.
480        if band_cmd == 1:
481            cmd = f'-,POW_LEV,v1_a1,{power_offset},{sat},{sat_id},0,0,0,1,1,{all_tx_type}'
482            self._query(cmd)
483        elif band_cmd == 2:
484            cmd = f'-,POW_LEV,v1_a2,{power_offset},{sat},{sat_id},0,0,0,1,1,{all_tx_type}'
485            self._query(cmd)
486