1#!/usr/bin/env python3 2# 3# Copyright (C) 2021 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16""" 17Python module for Spirent GSS7000 GNSS simulator. 18@author: Clay Liao (jianhsiungliao@) 19""" 20from time import sleep 21import xml.etree.ElementTree as ET 22from acts.controllers import abstract_inst 23 24 25def get_xml_text(xml_string='', tag=''): 26 """Parse xml from string and return specific tag 27 28 Args: 29 xml_string: xml string, 30 Type, Str. 31 tag: tag in xml, 32 Type, Str. 33 34 Returns: 35 text: Text content in the tag 36 Type, Str. 37 """ 38 if xml_string and tag: 39 root = ET.fromstring(xml_string) 40 try: 41 text = str(root.find(tag).text).rstrip().lstrip() 42 except ValueError: 43 text = 'INVALID DATA' 44 else: 45 text = 'INVALID DATA' 46 return text 47 48 49class GSS7000Error(abstract_inst.SocketInstrumentError): 50 """GSS7000 Instrument Error Class.""" 51 52 53class AbstractInstGss7000(abstract_inst.SocketInstrument): 54 """Abstract instrument for GSS7000""" 55 56 def _query(self, cmd): 57 """query instrument via Socket. 58 59 Args: 60 cmd: Command to send, 61 Type, Str. 62 63 Returns: 64 resp: Response from Instrument via Socket, 65 Type, Str. 66 """ 67 self._send(cmd) 68 self._wait() 69 resp = self._recv() 70 return resp 71 72 def _wait(self, wait_time=1): 73 """wait function 74 Args: 75 wait_time: wait time in sec. 76 Type, int, 77 Default, 1. 78 """ 79 sleep(wait_time) 80 81 82class GSS7000Ctrl(AbstractInstGss7000): 83 """GSS7000 control daemon class""" 84 85 def __init__(self, ip_addr, ip_port=7717): 86 """Init method for GSS7000 Control Daemon. 87 88 Args: 89 ip_addr: IP Address. 90 Type, str. 91 ip_port: TCPIP Port. 92 Type, str. 93 """ 94 super().__init__(ip_addr, ip_port) 95 self.idn = 'Spirent-GSS7000 Control Daemon' 96 97 def connect(self): 98 """Init and Connect to GSS7000 Control Daemon.""" 99 # Connect socket then connect socket again 100 self._close_socket() 101 self._connect_socket() 102 # Stop GSS7000 Control Daeamon Then Start 103 self._query('STOP_ENGINE') 104 self._wait() 105 self._query('START_ENGINE') 106 107 def close(self): 108 """Close GSS7000 control daemon""" 109 self._close_socket() 110 self._logger.debug('Closed connection to GSS7000 control daemon') 111 112 113class GSS7000(AbstractInstGss7000): 114 """GSS7000 Class, inherted from abstract_inst SocketInstrument.""" 115 116 def __init__(self, ip_addr, engine_ip_port=15650, ctrl_ip_port=7717): 117 """Init method for GSS7000. 118 119 Args: 120 ip_addr: IP Address. 121 Type, str. 122 engine_ip_port: TCPIP Port for 123 Type, str. 124 ctrl_ip_port: TCPIP Port for Control Daemon 125 """ 126 super().__init__(ip_addr, engine_ip_port) 127 self.idn = '' 128 self.connected = False 129 self.capability = [] 130 self.gss7000_ctrl_daemon = GSS7000Ctrl(ip_addr, ctrl_ip_port) 131 # Close control daemon and engine sockets at the beginning 132 self.gss7000_ctrl_daemon._close_socket() 133 self._close_socket() 134 135 def connect(self): 136 """Connect GSS7000 engine daemon""" 137 # Connect control daemon socket 138 self._logger.debug('Connect to GSS7000') 139 self.gss7000_ctrl_daemon.connect() 140 # Connect to remote engine socket 141 self._wait() 142 self._connect_socket() 143 self.connected = True 144 self.get_hw_capability() 145 146 def close(self): 147 """Close GSS7000 engine daemon""" 148 # Close GSS7000 control daemon 149 self.gss7000_ctrl_daemon.close() 150 # Close GSS7000 engine daemon 151 self._close_socket() 152 self._logger.debug('Closed connection to GSS7000 engine daemon') 153 154 def _parse_hw_cap(self, xml): 155 """Parse GSS7000 hardware capability xml to list. 156 Args: 157 xml: hardware capability xml, 158 Type, str. 159 160 Returns: 161 capability: Hardware capability dictionary 162 Type, list. 163 """ 164 root = ET.fromstring(xml) 165 capability_ls = [] 166 sig_cap_list = root.find('data').find('Signal_capabilities').findall( 167 'Signal') 168 for signal in sig_cap_list: 169 value = str(signal.text).rstrip().lstrip() 170 capability_ls.extend(value.upper().split(' ')) 171 return capability_ls 172 173 def get_hw_capability(self): 174 """Check GSS7000 hardware capability 175 176 Returns: 177 capability: Hardware capability dictionary, 178 Type, list. 179 """ 180 if self.connected: 181 capability_xml = self._query('GET_LICENCED_HARDWARE_CAPABILITY') 182 self.capability = self._parse_hw_cap(capability_xml) 183 184 return self.capability 185 186 def get_idn(self): 187 """Get the SimREPLAYplus Version 188 189 Returns: 190 SimREPLAYplus Version 191 """ 192 idn_xml = self._query('*IDN?') 193 self.idn = get_xml_text(idn_xml, 'data') 194 return self.idn 195 196 def load_scenario(self, scenario=''): 197 """Load the scenario. 198 199 Args: 200 scenario: path of scenario, 201 Type, str 202 """ 203 if scenario == '': 204 errmsg = ('Missing scenario file') 205 raise GSS7000Error(error=errmsg, command='load_scenario') 206 self._logger.debug('Stopped the original scenario') 207 self._query('-,EN,1') 208 cmd = 'SC,' + scenario 209 self._logger.debug('Loading scenario') 210 self._query(cmd) 211 self._logger.debug('Scenario is loaded') 212 return True 213 214 def start_scenario(self, scenario=''): 215 """Load and Start the running scenario. 216 217 Args: 218 scenario: path of scenario, 219 Type, str 220 """ 221 if scenario: 222 if self.load_scenario(scenario): 223 self._query('RU') 224 # TODO: Need to refactor the logic design to solve the comment in ag/19222896 225 # Track the issue in b/241200605 226 else: 227 infmsg = 'No scenario is loaded. Stop running scenario' 228 self._logger.debug(infmsg) 229 else: 230 pass 231 232 if scenario: 233 infmsg = f'Started running scenario {scenario}' 234 else: 235 infmsg = 'Started running current scenario' 236 237 self._logger.debug(infmsg) 238 239 def get_scenario_name(self): 240 """Get current scenario name""" 241 sc_name_xml = self._query('SC_NAME') 242 return get_xml_text(sc_name_xml, 'data') 243 244 def stop_scenario(self): 245 """Stop the running scenario.""" 246 self._query('-,EN,1') 247 self._logger.debug('Stopped running scenario') 248 249 def set_power_offset(self, ant=1, power_offset=0): 250 """Set Power Offset of GSS7000 Tx 251 Args: 252 ant: antenna number of GSS7000 253 power_offset: transmit power offset level 254 Type, float. 255 Decimal, unit [dB] 256 257 Raises: 258 GSS7000Error: raise when power offset level is not in [-49, 15] range. 259 """ 260 if not -49 <= power_offset <= 15: 261 errmsg = (f'"power_offset" must be within [-49, 15], ' 262 f'current input is {power_offset}') 263 raise GSS7000Error(error=errmsg, command='set_power_offset') 264 265 cmd = f'-,POW_LEV,V1_A{ant},{power_offset},GPS,0,0,1,1,1,1,0' 266 self._query(cmd) 267 268 infmsg = f'Set veichel 1 antenna {ant} power offset: {power_offset}' 269 self._logger.debug(infmsg) 270 271 def set_ref_power(self, ref_dBm=-130): 272 """Set Ref Power of GSS7000 Tx 273 Args: 274 ref_dBm: transmit reference power level in dBm for GSS7000 275 Type, float. 276 Decimal, unit [dBm] 277 278 Raises: 279 GSS7000Error: raise when power offset level is not in [-170, -115] range. 280 """ 281 if not -170 <= ref_dBm <= -115: 282 errmsg = (f'"power_offset" must be within [-170, -115], ' 283 f'current input is {ref_dBm}') 284 raise GSS7000Error(error=errmsg, command='set_ref_power') 285 cmd = f'REF_DBM,{ref_dBm:.1f}' 286 self._query(cmd) 287 infmsg = f'Set reference power level: {ref_dBm:.1f}' 288 self._logger.debug(infmsg) 289 290 def get_status(self, return_txt=False): 291 """Get current GSS7000 Status 292 Args: 293 return_txt: booling for determining the return results 294 Type, booling. 295 """ 296 status_xml = self._query('NULL') 297 status = get_xml_text(status_xml, 'status') 298 if return_txt: 299 status_dict = { 300 '0': 'No Scenario loaded', 301 '1': 'Not completed loading a scenario', 302 '2': 'Idle, ready to run a scenario', 303 '3': 'Arming the scenario', 304 '4': 'Completed arming; or waiting for a command or' 305 'trigger signal to start the scenario', 306 '5': 'Scenario running', 307 '6': 'Current scenario is paused.', 308 '7': 'Active scenario has stopped and has not been reset.' 309 'Waiting for further commands.' 310 } 311 return status_dict.get(status) 312 return int(status) 313 314 def set_power(self, power_level=-130): 315 """Set Power Level of GSS7000 Tx 316 Args: 317 power_level: transmit power level 318 Type, float. 319 Decimal, unit [dBm] 320 321 Raises: 322 GSS7000Error: raise when power level is not in [-170, -115] range. 323 """ 324 if not -170 <= power_level <= -115: 325 errmsg = (f'"power_level" must be within [-170, -115], ' 326 f'current input is {power_level}') 327 raise GSS7000Error(error=errmsg, command='set_power') 328 329 power_offset = power_level + 130 330 self.set_power_offset(1, power_offset) 331 self.set_power_offset(2, power_offset) 332 333 infmsg = f'Set GSS7000 transmit power to "{power_level:.1f}"' 334 self._logger.debug(infmsg) 335 336 def power_lev_offset_cal(self, power_level=-130, sat='GPS', band='L1'): 337 """Convert target power level to power offset for GSS7000 power setting 338 Args: 339 power_level: transmit power level 340 Type, float. 341 Decimal, unit [dBm] 342 Default. -130 343 sat_system: to set power level for all Satellites 344 Type, str 345 Option 'GPS/GLO/GAL' 346 Type, str 347 freq_band: Frequency band to set the power level 348 Type, str 349 Option 'L1/L5/B1I/B1C/B2A/E5' 350 Default, '', assumed to be L1. 351 Return: 352 power_offset: The calculated power offset for setting GSS7000 GNSS target power. 353 """ 354 gss7000_tx_pwr = { 355 'GPS_L1': -130, 356 'GPS_L5': -127.9, 357 'GLONASS_F1': -131, 358 'GALILEO_L1': -127, 359 'GALILEO_E5': -122, 360 'BEIDOU_B1I': -133, 361 'BEIDOU_B1C': -130, 362 'BEIDOU_B2A': -127, 363 'QZSS_L1': -128.5, 364 'QZSS_L5': -124.9, 365 'IRNSS_L5': -130 366 } 367 368 sat_band = f'{sat}_{band}' 369 infmsg = f'Target satellite system and band: {sat_band}' 370 self._logger.debug(infmsg) 371 default_pwr_lev = gss7000_tx_pwr.get(sat_band, -130) 372 power_offset = power_level - default_pwr_lev 373 infmsg = ( 374 f'Targer power: {power_level}; Default power: {default_pwr_lev};' 375 f' Power offset: {power_offset}') 376 self._logger.debug(infmsg) 377 378 return power_offset 379 380 def sat_band_convert(self, sat, band): 381 """Satellite system and operation band conversion and check. 382 Args: 383 sat: to set power level for all Satellites 384 Type, str 385 Option 'GPS/GLO/GAL/BDS' 386 Type, str 387 band: Frequency band to set the power level 388 Type, str 389 Option 'L1/L5/B1I/B1C/B2A/F1/E5' 390 Default, '', assumed to be L1. 391 """ 392 sat_system_dict = { 393 'GPS': 'GPS', 394 'GLO': 'GLONASS', 395 'GAL': 'GALILEO', 396 'BDS': 'BEIDOU', 397 'IRNSS': 'IRNSS', 398 'ALL': 'GPS' 399 } 400 sat = sat_system_dict.get(sat, 'GPS') 401 if band == '': 402 infmsg = 'No band is set. Set to default band = L1' 403 self._logger.debug(infmsg) 404 band = 'L1' 405 if sat == '': 406 infmsg = 'No satellite system is set. Set to default sat = GPS' 407 self._logger.debug(infmsg) 408 sat = 'GPS' 409 sat_band = f'{sat}_{band}' 410 self._logger.debug(f'Current band: {sat_band}') 411 self._logger.debug(f'Capability: {self.capability}') 412 # Check if satellite standard and band are supported 413 # If not in support list, return GPS_L1 as default 414 if not sat_band in self.capability: 415 errmsg = ( 416 f'Satellite system and band ({sat_band}) are not supported.' 417 f'The GSS7000 support list: {self.capability}') 418 raise GSS7000Error(error=errmsg, command='set_scenario_power') 419 420 sat_band_tp = tuple(sat_band.split('_')) 421 422 return sat_band_tp 423 424 def set_scenario_power(self, 425 power_level=-130, 426 sat_id='', 427 sat_system='', 428 freq_band='L1'): 429 """Set dynamic power for the running scenario. 430 Args: 431 power_level: transmit power level 432 Type, float. 433 Decimal, unit [dBm] 434 Default. -130 435 sat_id: set power level for specific satellite identifiers 436 Type, int. 437 sat_system: to set power level for specific system 438 Type, str 439 Option 'GPS/GLO/GAL/BDS' 440 Type, str 441 Default, '', assumed to be GPS. 442 freq_band: Frequency band to set the power level 443 Type, str 444 Option 'L1/L5/B1I/B1C/B2A/F1/E5' 445 Default, '', assumed to be L1. 446 Raises: 447 GSS7000Error: raise when power offset is not in [-49, -15] range. 448 """ 449 band_dict = { 450 'L1': 1, 451 'L5': 2, 452 'B2A': 2, 453 'B1I': 1, 454 'B1C': 1, 455 'F1': 1, 456 'E5': 2 457 } 458 459 # Convert and check satellite system and band 460 sat, band = self.sat_band_convert(sat_system, freq_band) 461 # Get freq band setting 462 band_cmd = band_dict.get(band, 1) 463 464 # When set sat_id --> control specific SV power. 465 # When set is not set --> control all SVs of specific system power. 466 if not sat_id: 467 sat_id = 1 468 all_tx_type = 1 469 else: 470 all_tx_type = 0 471 # Convert absolute power level to absolute power offset. 472 power_offset = self.power_lev_offset_cal(power_level, sat, band) 473 474 if not -49 <= power_offset <= 15: 475 errmsg = (f'"power_offset" must be within [-49, 15], ' 476 f'current input is {power_offset}') 477 raise GSS7000Error(error=errmsg, command='set_power_offset') 478 479 # If no specific sat_system is set, the default is GPS L1. 480 if band_cmd == 1: 481 cmd = f'-,POW_LEV,v1_a1,{power_offset},{sat},{sat_id},0,0,0,1,1,{all_tx_type}' 482 self._query(cmd) 483 elif band_cmd == 2: 484 cmd = f'-,POW_LEV,v1_a2,{power_offset},{sat},{sat_id},0,0,0,1,1,{all_tx_type}' 485 self._query(cmd) 486