1"""Decorator for UWB ranging methods."""
2
3import time
4from typing import List
5from lib import uwb_ranging_params
6from mobly.controllers import android_device
7from mobly.controllers.android_device_lib import jsonrpc_client_base
8from mobly.snippet import errors
9
10CALLBACK_WAIT_TIME_SEC = 3
11STOP_CALLBACK_WAIT_TIME_SEC = 6
12
13
14class UwbRangingDecorator():
15  """Decorator for Uwb ranging methods."""
16
17  def __init__(self, ad: android_device.AndroidDevice):
18    """Initialize the ranging device.
19
20    Args:
21      ad: android device object
22
23    Usage:
24      The ranging methods should be called in the following order
25      1. open_ranging()
26      2. start_ranging()
27      3. find peer, distance measurement, aoa measurements.
28      4. stop_ranging()
29      5. close_ranging()
30    """
31    self.ad = ad
32    self._callback_keys = {}
33    self._event_handlers = {}
34    self.log = self.ad.log
35
36  def clear_ranging_session_callback_events(self, ranging_session_id: int = 0):
37    """Clear 'RangingSessionCallback' events from EventCache.
38
39    Args:
40      ranging_session_id: ranging session id.
41    """
42    handler = self._event_handlers[ranging_session_id]
43    handler.getAll("RangingSessionCallback")
44
45  def verify_callback_received(self,
46                               ranging_event: str,
47                               session: int = 0,
48                               timeout: int = CALLBACK_WAIT_TIME_SEC):
49    """Verifies if the expected callback is received.
50
51    Args:
52      ranging_event: Expected ranging event.
53      session: ranging session.
54      timeout: callback timeout.
55
56    Raises:
57      TimeoutError: if the expected callback event is not received.
58    """
59    handler = self._event_handlers[session]
60    start_time = time.time()
61    while time.time() - start_time < timeout:
62      try:
63        event = handler.waitAndGet("RangingSessionCallback", timeout=timeout)
64        event_received = event.data["rangingSessionEvent"]
65        self.ad.log.debug("Received event - %s" % event_received)
66        if event_received == ranging_event:
67          self.ad.log.debug("Received the '%s' callback in %ss" %
68                            (ranging_event, round(time.time() - start_time, 2)))
69          self.clear_ranging_session_callback_events(session)
70          return
71      except errors.CallbackHandlerTimeoutError as e:
72        self.log.warn("Failed to receive 'RangingSessionCallback' event")
73    raise TimeoutError("Failed to receive '%s' event" % ranging_event)
74
75  def open_fira_ranging(self,
76                        params: uwb_ranging_params.UwbRangingParams,
77                        session: int = 0,
78                        expect_to_succeed: bool = True):
79    """Opens fira ranging session.
80
81    Args:
82      params: UWB ranging parameters.
83      session: ranging session.
84      expect_to_succeed: Whether the session open is expected to succeed.
85    """
86    callback_key = "fira_session_%s" % session
87    handler = self.ad.uwb.openFiraRangingSession(callback_key, params.to_dict())
88    self._event_handlers[session] = handler
89    if expect_to_succeed:
90      self.verify_callback_received("Opened", session)
91      self._callback_keys[session] = callback_key
92    else:
93      self.verify_callback_received("OpenFailed", session)
94
95  def start_fira_ranging(self, session: int = 0):
96    """Starts Fira ranging session.
97
98    Args:
99      session: ranging session.
100    """
101    self.ad.uwb.startFiraRangingSession(self._callback_keys[session])
102    self.verify_callback_received("Started", session)
103
104  def reconfigure_fira_ranging(
105      self,
106      params: uwb_ranging_params.UwbRangingReconfigureParams,
107      session: int = 0):
108    """Reconfigures Fira ranging parameters.
109
110    Args:
111      params: UWB reconfigured params.
112      session: ranging session.
113    """
114    self.ad.uwb.reconfigureFiraRangingSession(self._callback_keys[session],
115                                              params.to_dict())
116    self.verify_callback_received("Reconfigured", session)
117
118  def add_controlee_fira_ranging(
119      self,
120      params: uwb_ranging_params.UwbRangingControleeParams,
121      session: int = 0,
122  ):
123    """Reconfigures Fira ranging to add controlee.
124
125    Args:
126      params: UWB controlee params.
127      session: ranging session.
128    """
129    self.ad.uwb.addControleeFiraRangingSession(
130        self._callback_keys[session], params.to_dict()
131    )
132    self.verify_callback_received("ControleeAdded", session)
133
134  def remove_controlee_fira_ranging(
135      self,
136      params: uwb_ranging_params.UwbRangingControleeParams,
137      session: int = 0,
138  ):
139    """Reconfigures Fira ranging to add controlee.
140
141    Args:
142      params: UWB controlee params.
143      session: ranging session.
144    """
145    self.ad.uwb.removeControleeFiraRangingSession(
146        self._callback_keys[session], params.to_dict()
147    )
148    self.verify_callback_received("ControleeRemoved", session)
149
150  def is_uwb_peer_found(self, addr: List[int], session: int = 0) -> bool:
151    """Verifies if the UWB peer is found.
152
153    Args:
154      addr: peer address.
155      session: ranging session.
156
157    Returns:
158      True if peer is found, False if not.
159    """
160    self.verify_callback_received("ReportReceived", session)
161    return self.ad.uwb.isUwbPeerFound(self._callback_keys[session], addr)
162
163  def get_distance_measurement(self,
164                               addr: List[int],
165                               session: int = 0) -> float:
166    """Returns distance measurement from peer.
167
168    Args:
169      addr: peer address.
170      session: ranging session.
171
172    Returns:
173      Distance measurement in float.
174
175    Raises:
176      ValueError: if the DistanceMeasurement object is null.
177    """
178    try:
179      return self.ad.uwb.getDistanceMeasurement(self._callback_keys[session],
180                                                addr)
181    except jsonrpc_client_base.ApiError as api_error:
182      raise ValueError("Failed to get distance measurement.") from api_error
183
184  def get_aoa_azimuth_measurement(self,
185                                  addr: List[int],
186                                  session: int = 0) -> float:
187    """Returns AoA azimuth measurement data from peer.
188
189    Args:
190      addr: list, peer address.
191      session: ranging session.
192
193    Returns:
194      AoA azimuth measurement in radians in float.
195
196    Raises:
197      ValueError: if the AngleMeasurement object is null.
198    """
199    try:
200      return self.ad.uwb.getAoAAzimuthMeasurement(
201          self._callback_keys[session], addr)
202    except jsonrpc_client_base.ApiError as api_error:
203      raise ValueError("Failed to get azimuth measurement.") from api_error
204
205  def get_aoa_altitude_measurement(self,
206                                   addr: List[int],
207                                   session: int = 0) -> float:
208    """Gets UWB AoA altitude measurement data.
209
210    Args:
211      addr: list, peer address.
212      session: ranging session.
213
214    Returns:
215      AoA altitude measurement in radians in float.
216
217    Raises:
218      ValueError: if the AngleMeasurement object is null.
219    """
220    try:
221      return self.ad.uwb.getAoAAltitudeMeasurement(
222          self._callback_keys[session], addr)
223    except jsonrpc_client_base.ApiError as api_error:
224      raise ValueError("Failed to get altitude measurement.") from api_error
225
226  def get_rssi_measurement(self, addr: List[int], session: int = 0) -> int:
227    """Returns RSSI measurement from both devices.
228
229    Args:
230      addr: peer address.
231      session: ranging session.
232
233    Returns:
234      RSSI measurement in int.
235
236    Raises:
237      ValueError: if the RSSI Measurement object is null.
238    """
239    try:
240      return self.ad.uwb.getRssiDbmMeasurement(self._callback_keys[session],
241                                               addr)
242    except errors.ApiError as api_error:
243      raise ValueError("Failed to get RSSI measurement.") from api_error
244
245  def stop_ranging(self, session: int = 0):
246    """Stops UWB ranging session.
247
248    Args:
249      session: ranging session.
250    """
251    self.ad.uwb.stopRangingSession(self._callback_keys[session])
252    self.verify_callback_received("Stopped", session, STOP_CALLBACK_WAIT_TIME_SEC)
253
254  def close_ranging(self, session: int = 0):
255    """Closes ranging session.
256
257    Args:
258      session: ranging session.
259    """
260    if session not in self._callback_keys:
261      return
262    self.ad.uwb.closeRangingSession(self._callback_keys[session])
263    self.verify_callback_received("Closed", session)
264    self._callback_keys.pop(session, None)
265    self._event_handlers.pop(session, None)
266
267  def close_all_ranging_sessions(self):
268    """Closes all ranging sessions.
269
270    Args:
271    """
272    for session in self._callback_keys:
273      self.clear_ranging_session_callback_events(session)
274      self.ad.uwb.closeRangingSession(self._callback_keys[session])
275      self.verify_callback_received("Closed", session)
276    self.clear_all_ranging_sessions()
277
278  def clear_all_ranging_sessions(self):
279    """Clear all ranging sessions from internal map (for ex: after reboot).
280
281    Args:
282    """
283    self._callback_keys.clear()
284    self._event_handlers.clear()
285