1"""Decorator for UWB ranging methods.""" 2 3import time 4from typing import List 5from lib import uwb_ranging_params 6from mobly.controllers import android_device 7from mobly.controllers.android_device_lib import jsonrpc_client_base 8from mobly.snippet import errors 9 10CALLBACK_WAIT_TIME_SEC = 3 11STOP_CALLBACK_WAIT_TIME_SEC = 6 12 13 14class UwbRangingDecorator(): 15 """Decorator for Uwb ranging methods.""" 16 17 def __init__(self, ad: android_device.AndroidDevice): 18 """Initialize the ranging device. 19 20 Args: 21 ad: android device object 22 23 Usage: 24 The ranging methods should be called in the following order 25 1. open_ranging() 26 2. start_ranging() 27 3. find peer, distance measurement, aoa measurements. 28 4. stop_ranging() 29 5. close_ranging() 30 """ 31 self.ad = ad 32 self._callback_keys = {} 33 self._event_handlers = {} 34 self.log = self.ad.log 35 36 def clear_ranging_session_callback_events(self, ranging_session_id: int = 0): 37 """Clear 'RangingSessionCallback' events from EventCache. 38 39 Args: 40 ranging_session_id: ranging session id. 41 """ 42 handler = self._event_handlers[ranging_session_id] 43 handler.getAll("RangingSessionCallback") 44 45 def verify_callback_received(self, 46 ranging_event: str, 47 session: int = 0, 48 timeout: int = CALLBACK_WAIT_TIME_SEC): 49 """Verifies if the expected callback is received. 50 51 Args: 52 ranging_event: Expected ranging event. 53 session: ranging session. 54 timeout: callback timeout. 55 56 Raises: 57 TimeoutError: if the expected callback event is not received. 58 """ 59 handler = self._event_handlers[session] 60 start_time = time.time() 61 while time.time() - start_time < timeout: 62 try: 63 event = handler.waitAndGet("RangingSessionCallback", timeout=timeout) 64 event_received = event.data["rangingSessionEvent"] 65 self.ad.log.debug("Received event - %s" % event_received) 66 if event_received == ranging_event: 67 self.ad.log.debug("Received the '%s' callback in %ss" % 68 (ranging_event, round(time.time() - start_time, 2))) 69 self.clear_ranging_session_callback_events(session) 70 return 71 except errors.CallbackHandlerTimeoutError as e: 72 self.log.warn("Failed to receive 'RangingSessionCallback' event") 73 raise TimeoutError("Failed to receive '%s' event" % ranging_event) 74 75 def open_fira_ranging(self, 76 params: uwb_ranging_params.UwbRangingParams, 77 session: int = 0, 78 expect_to_succeed: bool = True): 79 """Opens fira ranging session. 80 81 Args: 82 params: UWB ranging parameters. 83 session: ranging session. 84 expect_to_succeed: Whether the session open is expected to succeed. 85 """ 86 callback_key = "fira_session_%s" % session 87 handler = self.ad.uwb.openFiraRangingSession(callback_key, params.to_dict()) 88 self._event_handlers[session] = handler 89 if expect_to_succeed: 90 self.verify_callback_received("Opened", session) 91 self._callback_keys[session] = callback_key 92 else: 93 self.verify_callback_received("OpenFailed", session) 94 95 def start_fira_ranging(self, session: int = 0): 96 """Starts Fira ranging session. 97 98 Args: 99 session: ranging session. 100 """ 101 self.ad.uwb.startFiraRangingSession(self._callback_keys[session]) 102 self.verify_callback_received("Started", session) 103 104 def reconfigure_fira_ranging( 105 self, 106 params: uwb_ranging_params.UwbRangingReconfigureParams, 107 session: int = 0): 108 """Reconfigures Fira ranging parameters. 109 110 Args: 111 params: UWB reconfigured params. 112 session: ranging session. 113 """ 114 self.ad.uwb.reconfigureFiraRangingSession(self._callback_keys[session], 115 params.to_dict()) 116 self.verify_callback_received("Reconfigured", session) 117 118 def add_controlee_fira_ranging( 119 self, 120 params: uwb_ranging_params.UwbRangingControleeParams, 121 session: int = 0, 122 ): 123 """Reconfigures Fira ranging to add controlee. 124 125 Args: 126 params: UWB controlee params. 127 session: ranging session. 128 """ 129 self.ad.uwb.addControleeFiraRangingSession( 130 self._callback_keys[session], params.to_dict() 131 ) 132 self.verify_callback_received("ControleeAdded", session) 133 134 def remove_controlee_fira_ranging( 135 self, 136 params: uwb_ranging_params.UwbRangingControleeParams, 137 session: int = 0, 138 ): 139 """Reconfigures Fira ranging to add controlee. 140 141 Args: 142 params: UWB controlee params. 143 session: ranging session. 144 """ 145 self.ad.uwb.removeControleeFiraRangingSession( 146 self._callback_keys[session], params.to_dict() 147 ) 148 self.verify_callback_received("ControleeRemoved", session) 149 150 def is_uwb_peer_found(self, addr: List[int], session: int = 0) -> bool: 151 """Verifies if the UWB peer is found. 152 153 Args: 154 addr: peer address. 155 session: ranging session. 156 157 Returns: 158 True if peer is found, False if not. 159 """ 160 self.verify_callback_received("ReportReceived", session) 161 return self.ad.uwb.isUwbPeerFound(self._callback_keys[session], addr) 162 163 def get_distance_measurement(self, 164 addr: List[int], 165 session: int = 0) -> float: 166 """Returns distance measurement from peer. 167 168 Args: 169 addr: peer address. 170 session: ranging session. 171 172 Returns: 173 Distance measurement in float. 174 175 Raises: 176 ValueError: if the DistanceMeasurement object is null. 177 """ 178 try: 179 return self.ad.uwb.getDistanceMeasurement(self._callback_keys[session], 180 addr) 181 except jsonrpc_client_base.ApiError as api_error: 182 raise ValueError("Failed to get distance measurement.") from api_error 183 184 def get_aoa_azimuth_measurement(self, 185 addr: List[int], 186 session: int = 0) -> float: 187 """Returns AoA azimuth measurement data from peer. 188 189 Args: 190 addr: list, peer address. 191 session: ranging session. 192 193 Returns: 194 AoA azimuth measurement in radians in float. 195 196 Raises: 197 ValueError: if the AngleMeasurement object is null. 198 """ 199 try: 200 return self.ad.uwb.getAoAAzimuthMeasurement( 201 self._callback_keys[session], addr) 202 except jsonrpc_client_base.ApiError as api_error: 203 raise ValueError("Failed to get azimuth measurement.") from api_error 204 205 def get_aoa_altitude_measurement(self, 206 addr: List[int], 207 session: int = 0) -> float: 208 """Gets UWB AoA altitude measurement data. 209 210 Args: 211 addr: list, peer address. 212 session: ranging session. 213 214 Returns: 215 AoA altitude measurement in radians in float. 216 217 Raises: 218 ValueError: if the AngleMeasurement object is null. 219 """ 220 try: 221 return self.ad.uwb.getAoAAltitudeMeasurement( 222 self._callback_keys[session], addr) 223 except jsonrpc_client_base.ApiError as api_error: 224 raise ValueError("Failed to get altitude measurement.") from api_error 225 226 def get_rssi_measurement(self, addr: List[int], session: int = 0) -> int: 227 """Returns RSSI measurement from both devices. 228 229 Args: 230 addr: peer address. 231 session: ranging session. 232 233 Returns: 234 RSSI measurement in int. 235 236 Raises: 237 ValueError: if the RSSI Measurement object is null. 238 """ 239 try: 240 return self.ad.uwb.getRssiDbmMeasurement(self._callback_keys[session], 241 addr) 242 except errors.ApiError as api_error: 243 raise ValueError("Failed to get RSSI measurement.") from api_error 244 245 def stop_ranging(self, session: int = 0): 246 """Stops UWB ranging session. 247 248 Args: 249 session: ranging session. 250 """ 251 self.ad.uwb.stopRangingSession(self._callback_keys[session]) 252 self.verify_callback_received("Stopped", session, STOP_CALLBACK_WAIT_TIME_SEC) 253 254 def close_ranging(self, session: int = 0): 255 """Closes ranging session. 256 257 Args: 258 session: ranging session. 259 """ 260 if session not in self._callback_keys: 261 return 262 self.ad.uwb.closeRangingSession(self._callback_keys[session]) 263 self.verify_callback_received("Closed", session) 264 self._callback_keys.pop(session, None) 265 self._event_handlers.pop(session, None) 266 267 def close_all_ranging_sessions(self): 268 """Closes all ranging sessions. 269 270 Args: 271 """ 272 for session in self._callback_keys: 273 self.clear_ranging_session_callback_events(session) 274 self.ad.uwb.closeRangingSession(self._callback_keys[session]) 275 self.verify_callback_received("Closed", session) 276 self.clear_all_ranging_sessions() 277 278 def clear_all_ranging_sessions(self): 279 """Clear all ranging sessions from internal map (for ex: after reboot). 280 281 Args: 282 """ 283 self._callback_keys.clear() 284 self._event_handlers.clear() 285