1#!/usr/bin/env python3 2# 3# Copyright 2022 - Google 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import asyncio 18import json 19import logging 20from typing import Any, Mapping, Optional, Tuple 21 22from acts.controllers.amarisoft_lib import ssh_utils 23import immutabledict 24import websockets 25 26_CONFIG_DIR_MAPPING = immutabledict.immutabledict({ 27 'enb': '/config/enb.cfg', 28 'mme': '/config/mme.cfg', 29 'ims': '/config/ims.cfg', 30 'mbms': '/config/mbmsgw.cfg', 31 'ots': '/config/ots.cfg' 32}) 33 34 35class MessageFailureError(Exception): 36 """Raises an error when the message execution fail.""" 37 38 39class AmariSoftClient(ssh_utils.RemoteClient): 40 """The SSH client class interacts with Amarisoft. 41 42 A simulator used to simulate the base station can output different signals 43 according to the network configuration settings. 44 For example: T Mobile NSA LTE band 66 + NR band 71. 45 """ 46 47 async def _send_message_to_callbox(self, uri: str, 48 msg: str) -> Tuple[str, str]: 49 """Implements async function for send message to the callbox. 50 51 Args: 52 uri: The uri of specific websocket interface. 53 msg: The message to be send to callbox. 54 55 Returns: 56 The response from callbox. 57 """ 58 async with websockets.connect( 59 uri, extra_headers={'origin': 'Test'}) as websocket: 60 await websocket.send(msg) 61 head = await websocket.recv() 62 body = await websocket.recv() 63 return head, body 64 65 def send_message(self, port: str, msg: str) -> Tuple[str, str]: 66 """Sends a message to the callbox. 67 68 Args: 69 port: The port of specific websocket interface. 70 msg: The message to be send to callbox. 71 72 Returns: 73 The response from callbox. 74 """ 75 return asyncio.get_event_loop().run_until_complete( 76 self._send_message_to_callbox(f'ws://{self.host}:{port}/', msg)) 77 78 def verify_response(self, func: str, head: str, 79 body: str) -> Tuple[Mapping[str, Any], Mapping[str, Any]]: 80 """Makes sure there are no error messages in Amarisoft's response. 81 82 If a message produces an error, response will have an error string field 83 representing the error. 84 For example: 85 { 86 "message": "ready", 87 "message_id": <message id>, 88 "error": <error message>, 89 "type": "ENB", 90 "name: <name>, 91 } 92 93 Args: 94 func: The message send to Amarisoft. 95 head: Responsed message head. 96 body: Responsed message body. 97 98 Returns: 99 Standard output of the shell command. 100 101 Raises: 102 MessageFailureError: Raised when an error occurs in the response message. 103 """ 104 loaded_head = json.loads(head) 105 loaded_body = json.loads(body) 106 107 if loaded_head.get('message') != 'ready': 108 raise MessageFailureError( 109 f'Fail to get response from callbox, message: {loaded_head["error"]}') 110 if 'error' in loaded_body: 111 raise MessageFailureError( 112 f'Fail to excute {func} with error message: {loaded_body["error"]}') 113 if loaded_body.get('message') != func: 114 raise MessageFailureError( 115 f'The message sent was {loaded_body["message"]} instead of {func}.') 116 return loaded_head, loaded_body 117 118 def lte_service_stop(self) -> None: 119 """Stops to output signal.""" 120 self.run_cmd('systemctl stop lte') 121 122 def lte_service_start(self): 123 """Starts to output signal.""" 124 self.run_cmd('systemctl start lte') 125 126 def lte_service_restart(self): 127 """Restarts to output signal.""" 128 self.run_cmd('systemctl restart lte') 129 130 def lte_service_enable(self): 131 """lte service remains enable until next reboot.""" 132 self.run_cmd('systemctl enable lte') 133 134 def lte_service_disable(self): 135 """lte service remains disable until next reboot.""" 136 self.run_cmd('systemctl disable lte') 137 138 def lte_service_is_active(self) -> bool: 139 """Checks lte service is active or not. 140 141 Returns: 142 True if service active, False otherwise. 143 """ 144 return not any('inactive' in line 145 for line in self.run_cmd('systemctl is-active lte')) 146 147 def set_config_dir(self, cfg_type: str, path: str) -> None: 148 """Sets the path of target configuration file. 149 150 Args: 151 cfg_type: The type of target configuration. (e.g. mme, enb ...etc.) 152 path: The path of target configuration. (e.g. 153 /root/lteenb-linux-2020-12-14) 154 """ 155 path_old = self.get_config_dir(cfg_type) 156 if path != path_old: 157 logging.info('set new path %s (was %s)', path, path_old) 158 self.run_cmd(f'ln -sfn {path} /root/{cfg_type}') 159 else: 160 logging.info('path %s does not change.', path_old) 161 162 def get_config_dir(self, cfg_type: str) -> Optional[str]: 163 """Gets the path of target configuration. 164 165 Args: 166 cfg_type: Target configuration type. (e.g. mme, enb...etc.) 167 168 Returns: 169 The path of configuration. 170 """ 171 result = self.run_cmd(f'readlink /root/{cfg_type}') 172 if result: 173 path = result[0].strip() 174 else: 175 logging.warning('%s path not found.', cfg_type) 176 return None 177 return path 178 179 def set_config_file(self, cfg_type: str, cfg_file: str) -> None: 180 """Sets the configuration to be executed. 181 182 Args: 183 cfg_type: The type of target configuration. (e.g. mme, enb...etc.) 184 cfg_file: The configuration to be executed. (e.g. 185 /root/lteenb-linux-2020-12-14/config/gnb.cfg ) 186 187 Raises: 188 FileNotFoundError: Raised when a file or directory is requested but 189 doesn’t exist. 190 """ 191 cfg_link = self.get_config_dir(cfg_type) + _CONFIG_DIR_MAPPING[cfg_type] 192 if not self.is_file_exist(cfg_file): 193 raise FileNotFoundError("The command file doesn't exist") 194 self.run_cmd(f'ln -sfn {cfg_file} {cfg_link}') 195 196 def get_config_file(self, cfg_type: str) -> Optional[str]: 197 """Gets the current configuration of specific configuration type. 198 199 Args: 200 cfg_type: The type of target configuration. (e.g. mme, enb...etc.) 201 202 Returns: 203 The current configuration with absolute path. 204 """ 205 cfg_path = self.get_config_dir(cfg_type) + _CONFIG_DIR_MAPPING[cfg_type] 206 if cfg_path: 207 result = self.run_cmd(f'readlink {cfg_path}') 208 if result: 209 return result[0].strip() 210 211 def get_all_config_dir(self) -> Mapping[str, str]: 212 """Gets all configuration directions. 213 214 Returns: 215 All configuration directions. 216 """ 217 config_dir = {} 218 for cfg_type in ('ots', 'enb', 'mme', 'mbms'): 219 config_dir[cfg_type] = self.get_config_dir(cfg_type) 220 logging.debug('get path of %s: %s', cfg_type, config_dir[cfg_type]) 221 return config_dir 222