1#!/usr/bin/env python3
2#
3#   Copyright 2022 - Google
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import asyncio
18import json
19import logging
20from typing import Any, Mapping, Optional, Tuple
21
22from acts.controllers.amarisoft_lib import ssh_utils
23import immutabledict
24import websockets
25
26_CONFIG_DIR_MAPPING = immutabledict.immutabledict({
27    'enb': '/config/enb.cfg',
28    'mme': '/config/mme.cfg',
29    'ims': '/config/ims.cfg',
30    'mbms': '/config/mbmsgw.cfg',
31    'ots': '/config/ots.cfg'
32})
33
34
35class MessageFailureError(Exception):
36  """Raises an error when the message execution fail."""
37
38
39class AmariSoftClient(ssh_utils.RemoteClient):
40  """The SSH client class interacts with Amarisoft.
41
42    A simulator used to simulate the base station can output different signals
43    according to the network configuration settings.
44    For example: T Mobile NSA LTE band 66 + NR band 71.
45  """
46
47  async def _send_message_to_callbox(self, uri: str,
48                                     msg: str) -> Tuple[str, str]:
49    """Implements async function for send message to the callbox.
50
51    Args:
52      uri: The uri of specific websocket interface.
53      msg: The message to be send to callbox.
54
55    Returns:
56      The response from callbox.
57    """
58    async with websockets.connect(
59        uri, extra_headers={'origin': 'Test'}) as websocket:
60      await websocket.send(msg)
61      head = await websocket.recv()
62      body = await websocket.recv()
63    return head, body
64
65  def send_message(self, port: str, msg: str) -> Tuple[str, str]:
66    """Sends a message to the callbox.
67
68    Args:
69      port: The port of specific websocket interface.
70      msg: The message to be send to callbox.
71
72    Returns:
73      The response from callbox.
74    """
75    return asyncio.get_event_loop().run_until_complete(
76        self._send_message_to_callbox(f'ws://{self.host}:{port}/', msg))
77
78  def verify_response(self, func: str, head: str,
79                      body: str) -> Tuple[Mapping[str, Any], Mapping[str, Any]]:
80    """Makes sure there are no error messages in Amarisoft's response.
81
82    If a message produces an error, response will have an error string field
83    representing the error.
84    For example:
85      {
86        "message": "ready",
87        "message_id": <message id>,
88        "error": <error message>,
89        "type": "ENB",
90        "name: <name>,
91      }
92
93    Args:
94      func: The message send to Amarisoft.
95      head: Responsed message head.
96      body: Responsed message body.
97
98    Returns:
99      Standard output of the shell command.
100
101    Raises:
102       MessageFailureError: Raised when an error occurs in the response message.
103    """
104    loaded_head = json.loads(head)
105    loaded_body = json.loads(body)
106
107    if loaded_head.get('message') != 'ready':
108      raise MessageFailureError(
109          f'Fail to get response from callbox, message: {loaded_head["error"]}')
110    if 'error' in loaded_body:
111      raise MessageFailureError(
112          f'Fail to excute {func} with error message: {loaded_body["error"]}')
113    if loaded_body.get('message') != func:
114      raise MessageFailureError(
115          f'The message sent was {loaded_body["message"]} instead of {func}.')
116    return loaded_head, loaded_body
117
118  def lte_service_stop(self) -> None:
119    """Stops to output signal."""
120    self.run_cmd('systemctl stop lte')
121
122  def lte_service_start(self):
123    """Starts to output signal."""
124    self.run_cmd('systemctl start lte')
125
126  def lte_service_restart(self):
127    """Restarts to output signal."""
128    self.run_cmd('systemctl restart lte')
129
130  def lte_service_enable(self):
131    """lte service remains enable until next reboot."""
132    self.run_cmd('systemctl enable lte')
133
134  def lte_service_disable(self):
135    """lte service remains disable until next reboot."""
136    self.run_cmd('systemctl disable lte')
137
138  def lte_service_is_active(self) -> bool:
139    """Checks lte service is active or not.
140
141    Returns:
142      True if service active, False otherwise.
143    """
144    return not any('inactive' in line
145                   for line in self.run_cmd('systemctl is-active lte'))
146
147  def set_config_dir(self, cfg_type: str, path: str) -> None:
148    """Sets the path of target configuration file.
149
150    Args:
151      cfg_type: The type of target configuration. (e.g. mme, enb ...etc.)
152      path: The path of target configuration. (e.g.
153        /root/lteenb-linux-2020-12-14)
154    """
155    path_old = self.get_config_dir(cfg_type)
156    if path != path_old:
157      logging.info('set new path %s (was %s)', path, path_old)
158      self.run_cmd(f'ln -sfn {path} /root/{cfg_type}')
159    else:
160      logging.info('path %s does not change.', path_old)
161
162  def get_config_dir(self, cfg_type: str) -> Optional[str]:
163    """Gets the path of target configuration.
164
165    Args:
166      cfg_type: Target configuration type. (e.g. mme, enb...etc.)
167
168    Returns:
169      The path of configuration.
170    """
171    result = self.run_cmd(f'readlink /root/{cfg_type}')
172    if result:
173      path = result[0].strip()
174    else:
175      logging.warning('%s path not found.', cfg_type)
176      return None
177    return path
178
179  def set_config_file(self, cfg_type: str, cfg_file: str) -> None:
180    """Sets the configuration to be executed.
181
182    Args:
183      cfg_type: The type of target configuration. (e.g. mme, enb...etc.)
184      cfg_file: The configuration to be executed. (e.g.
185        /root/lteenb-linux-2020-12-14/config/gnb.cfg )
186
187    Raises:
188      FileNotFoundError: Raised when a file or directory is requested but
189      doesn’t exist.
190    """
191    cfg_link = self.get_config_dir(cfg_type) + _CONFIG_DIR_MAPPING[cfg_type]
192    if not self.is_file_exist(cfg_file):
193      raise FileNotFoundError("The command file doesn't exist")
194    self.run_cmd(f'ln -sfn {cfg_file} {cfg_link}')
195
196  def get_config_file(self, cfg_type: str) -> Optional[str]:
197    """Gets the current configuration of specific configuration type.
198
199    Args:
200      cfg_type: The type of target configuration. (e.g. mme, enb...etc.)
201
202    Returns:
203      The current configuration with absolute path.
204    """
205    cfg_path = self.get_config_dir(cfg_type) + _CONFIG_DIR_MAPPING[cfg_type]
206    if cfg_path:
207      result = self.run_cmd(f'readlink {cfg_path}')
208      if result:
209        return result[0].strip()
210
211  def get_all_config_dir(self) -> Mapping[str, str]:
212    """Gets all configuration directions.
213
214    Returns:
215      All configuration directions.
216    """
217    config_dir = {}
218    for cfg_type in ('ots', 'enb', 'mme', 'mbms'):
219      config_dir[cfg_type] = self.get_config_dir(cfg_type)
220      logging.debug('get path of %s: %s', cfg_type, config_dir[cfg_type])
221    return config_dir
222