1"""BLE test utils for netsim."""
2
3import logging
4import time
5from typing import Any
6
7from mobly import asserts
8from mobly import utils
9from mobly.controllers import android_device
10from mobly.snippet import callback_event
11
12
13# Number of seconds for the target to stay BLE advertising.
14ADVERTISING_TIME = 120
15# Number of seconds for the target to start BLE advertising.
16ADVERTISING_START_TIME = 30
17# The number of seconds to wait for receiving scan results.
18SCAN_TIMEOUT = 20
19# The number of seconds to wair for connection established.
20CONNECTION_TIMEOUT = 60
21# The number of seconds to wait before cancel connection.
22CANCEL_CONNECTION_WAIT_TIME = 0.1
23# UUID for test service.
24TEST_BLE_SERVICE_UUID = '0000fe23-0000-1000-8000-00805f9b34fb'
25# UUID for write characteristic.
26TEST_WRITE_UUID = '0000e632-0000-1000-8000-00805f9b34fb'
27# UUID for second write characteristic.
28TEST_SECOND_WRITE_UUID = '0000e633-0000-1000-8000-00805f9b34fb'
29# UUID for read test.
30TEST_READ_UUID = '0000e631-0000-1000-8000-00805f9b34fb'
31# UUID for second read characteristic.
32TEST_SECOND_READ_UUID = '0000e634-0000-1000-8000-00805f9b34fb'
33# UUID for third read characteristic.
34TEST_THIRD_READ_UUID = '0000e635-0000-1000-8000-00805f9b34fb'
35# UUID for scan response.
36TEST_SCAN_RESPONSE_UUID = '0000e639-0000-1000-8000-00805f9b34fb'
37# Advertise settings in json format for Ble Advertise.
38ADVERTISE_SETTINGS = {
39    'AdvertiseMode': 'ADVERTISE_MODE_LOW_LATENCY',
40    'Timeout': ADVERTISING_TIME * 1000,
41    'Connectable': True,
42    'TxPowerLevel': 'ADVERTISE_TX_POWER_ULTRA_LOW',
43}
44# Ramdom data to represent device stored in advertise data.
45DATA = utils.rand_ascii_str(16)
46# Random data for scan response.
47SCAN_RESPONSE_DATA = utils.rand_ascii_str(16)
48# Random data for read operation.
49READ_DATA = utils.rand_ascii_str(8)
50# Random data for second read operation.
51SECOND_READ_DATA = utils.rand_ascii_str(8)
52# Random data for third read operation.
53THIRD_READ_DATA = utils.rand_ascii_str(8)
54# Random data for write operation.
55WRITE_DATA = utils.rand_ascii_str(8)
56# Random data for second write operation.
57SECOND_WRITE_DATA = utils.rand_ascii_str(8)
58# Advertise data in json format for BLE advertise.
59ADVERTISE_DATA = {
60    'IncludeDeviceName': False,
61    'ServiceData': [{'UUID': TEST_BLE_SERVICE_UUID, 'Data': DATA}],
62}
63# Advertise data in json format representing scan response for BLE advertise.
64SCAN_RESPONSE = {
65    'IncludeDeviceName': False,
66    'ServiceData': [{
67        'UUID': TEST_SCAN_RESPONSE_UUID,
68        'Data': SCAN_RESPONSE_DATA,
69    }],
70}
71# Scan filter in json format for BLE scan.
72SCAN_FILTER = {'ServiceUuid': TEST_BLE_SERVICE_UUID}
73# Scan settings in json format for BLE scan.
74SCAN_SETTINGS = {'ScanMode': 'SCAN_MODE_LOW_LATENCY'}
75# Characteristics for write in json format.
76WRITE_CHARACTERISTIC = {
77    'UUID': TEST_WRITE_UUID,
78    'Property': 'PROPERTY_WRITE',
79    'Permission': 'PERMISSION_WRITE',
80}
81SECOND_WRITE_CHARACTERISTIC = {
82    'UUID': TEST_SECOND_WRITE_UUID,
83    'Property': 'PROPERTY_WRITE',
84    'Permission': 'PERMISSION_WRITE',
85}
86# Characteristics for read in json format.
87READ_CHARACTERISTIC = {
88    'UUID': TEST_READ_UUID,
89    'Property': 'PROPERTY_READ',
90    'Permission': 'PERMISSION_READ',
91    'Data': READ_DATA,
92}
93SECOND_READ_CHARACTERISTIC = {
94    'UUID': TEST_SECOND_READ_UUID,
95    'Property': 'PROPERTY_READ',
96    'Permission': 'PERMISSION_READ',
97    'Data': SECOND_READ_DATA,
98}
99THIRD_READ_CHARACTERISTIC = {
100    'UUID': TEST_THIRD_READ_UUID,
101    'Property': 'PROPERTY_READ',
102    'Permission': 'PERMISSION_READ',
103    'Data': THIRD_READ_DATA,
104}
105# Service data in json format for Ble Server.
106SERVICE = {
107    'UUID': TEST_BLE_SERVICE_UUID,
108    'Type': 'SERVICE_TYPE_PRIMARY',
109    'Characteristics': [
110        WRITE_CHARACTERISTIC,
111        SECOND_WRITE_CHARACTERISTIC,
112        READ_CHARACTERISTIC,
113        SECOND_READ_CHARACTERISTIC,
114        THIRD_READ_CHARACTERISTIC,
115    ],
116}
117# Macros for literal string.
118UUID = 'UUID'
119GATT_SUCCESS = 'GATT_SUCCESS'
120STATE = 'newState'
121STATUS = 'status'
122
123
124def IsRequiredScanResult(scan_result: callback_event.CallbackEvent) -> bool:
125  result = scan_result.data['result']
126  for service in result['ScanRecord']['Services']:
127    if service[UUID] == TEST_BLE_SERVICE_UUID and service['Data'] == DATA:
128      return True
129  return False
130
131
132def Discover(
133    scanner: android_device.AndroidDevice,
134    advertiser: android_device.AndroidDevice,
135) -> dict[str, Any]:
136  """Logic for BLE scan and advertising.
137
138  Steps:
139    1. Advertiser starts advertising and gets a startSuccess callback.
140    2. Scanner starts scanning and finds advertiser from scan results.
141
142  Verifies:
143    Advertiser is discovered within 5s by scanner.
144
145  Args:
146    scanner: AndroidDevice. The device that starts BLE scan to find target.
147    advertiser: AndroidDevice. The device that keeps advertising so other
148      devices acknowledge it.
149
150  Returns:
151    dict. Scan results.
152
153  Raises:
154    TimeoutError: The expected event does not occur within the time limit.
155  """
156  # Retry initial command in case command is lost after triggering a reset
157  max_attempts = 2
158  for attempt_num in range(max_attempts):
159    advertiser.advertise_callback = advertiser.mbs.bleStartAdvertising(
160        ADVERTISE_SETTINGS, ADVERTISE_DATA, SCAN_RESPONSE
161    )
162    scanner.scan_callback = scanner.mbs.bleStartScan(
163        [SCAN_FILTER], SCAN_SETTINGS
164    )
165    success = False
166    for _ in range(ADVERTISING_START_TIME):
167      failure = advertiser.advertise_callback.getAll('onStartFailure')
168      if failure:
169        logging.warning(
170            "'onStartFailure' event detected after bleStartAdvertising"
171        )
172      success = advertiser.advertise_callback.getAll('onStartSuccess')
173      if success:
174        break
175      time.sleep(1)
176    else:
177      logging.error(
178          'Timed out after %ss waiting for an "onStartSuccess" event ',
179          ADVERTISING_START_TIME,
180      )
181    if not success:
182      if attempt_num < max_attempts - 1:
183        logging.warning(
184            "'onStartSuccess' event was not received after "
185            'bleStartAdvertising. Retrying... (%d)',
186            attempt_num + 1,
187        )
188      else:
189        raise TimeoutError(
190            f'Timed out after {max_attempts} retries of '
191            f'{ADVERTISING_START_TIME}s waiting for an '
192            '"onStartSuccess" event '
193        )
194
195  advertiser.log.info('BLE advertising started')
196  time.sleep(SCAN_TIMEOUT)
197  scan_result = scanner.scan_callback.waitForEvent(
198      'onScanResult', IsRequiredScanResult, SCAN_TIMEOUT
199  )
200  scan_success = False
201  scan_response_found = False
202  result = scan_result.data['result']
203  scan_start_to_result_time_ms = scan_result.data['StartToResultTimeDeltaMs']
204  for service in result['ScanRecord']['Services']:
205    if service[UUID] == TEST_BLE_SERVICE_UUID and service['Data'] == DATA:
206      scanner.connect_to_address = result['Device']['Address']
207      scan_success = True
208    if (
209        service[UUID] == TEST_SCAN_RESPONSE_UUID
210        and service['Data'] == SCAN_RESPONSE_DATA
211    ):
212      scan_response_found = True
213  asserts.assert_true(
214      scan_success, 'Advertiser is not found inside %d seconds' % SCAN_TIMEOUT
215  )
216  asserts.assert_true(scan_response_found, 'Scan response is not found')
217  logging.info('Discovery metrics: %d', scan_start_to_result_time_ms)
218  return result
219
220
221def StartScanning(
222    scanner: android_device.AndroidDevice, scan_duration: int
223) -> list[dict[str, Any]]:
224  """Logic for BLE scanning for advertisers.
225
226  Steps:
227    1. Scanner starts scanning with retries
228    2. Retrieves the ScanResult
229
230  Verifies:
231    Advertiser is discovered within timeout by scanner.
232
233  Args:
234    scanner: AndroidDevice. The device that starts BLE scan to find advertisers.
235    scan_duration: Number of seconds for each scan attempt
236
237  Returns:
238    List of dicts containing Scan results.
239
240  Raises:
241    TimeoutError: The expected event does not occur within the time limit.
242  """
243  # Retry initial command in case command is lost after triggering a reset
244  max_attempts = 3
245  scan_success = False
246  result = []
247  scan_result = None
248  for attempt_num in range(max_attempts):
249    scanner.scan_callback = scanner.mbs.bleStartScan()
250    scanner.log.info('BLE scanning started')
251    failure = scanner.scan_callback.getAll('onScanFailed')
252    if failure:
253      logging.warning("'onScanFailed' event detected after bleStartScan")
254      continue
255    success = False
256    for _ in range(int(SCAN_TIMEOUT / scan_duration)):
257      time.sleep(scan_duration)
258      scan_result = scanner.scan_callback.getAll('onScanResult')
259      if scan_result:
260        success = True
261        break
262    else:
263      logging.error(
264          'Timed out after %ss waiting for an "onScanResult" event ',
265          SCAN_TIMEOUT,
266      )
267    if success:
268      break
269    if attempt_num < max_attempts - 1:
270      logging.warning(
271          "'onScanResult' event was not received after "
272          'bleStartScan. Retrying... (%d)',
273          attempt_num + 1,
274      )
275    else:
276      raise TimeoutError(
277          f'Timed out after {max_attempts} retries of '
278          f'{SCAN_TIMEOUT}s waiting for an '
279          '"onScanResult" event '
280      )
281
282  if scan_result:
283    scan_success = True
284    result = [result.data['result'] for result in scan_result]
285
286  asserts.assert_true(
287      scan_success, 'Advertiser is not found inside %d seconds' % SCAN_TIMEOUT
288  )
289  return result
290
291
292def StopDiscover(
293    scanner: android_device.AndroidDevice,
294    advertiser: android_device.AndroidDevice,
295) -> None:
296  """Logic for stopping BLE scan and advertising.
297
298  Steps:
299    1. Scanner stops scanning.
300    2. Advertiser stops advertising.
301
302  Args:
303    scanner: AndroidDevice. The device that starts BLE scan to find target.
304    advertiser: AndroidDevice. The device that keeps advertising so other
305      devices acknowledge it.
306  """
307  scanner.mbs.bleStopScan(scanner.scan_callback.callback_id)
308  scanner.log.info('BLE scanning stopped')
309  advertiser.mbs.bleStopAdvertising(advertiser.advertise_callback.callback_id)
310  advertiser.log.info('BLE advertising stopped')
311
312
313def StopScanning(scanner: android_device.AndroidDevice) -> None:
314  """Logic for stopping BLE scan.
315
316  Steps:
317    1. Scanner stops scanning.
318
319  Args:
320    scanner: AndroidDevice. The device that starts BLE scan to find target.
321  """
322  scanner.mbs.bleStopScan(scanner.scan_callback.callback_id)
323  scanner.log.info('BLE scanning stopped')
324
325
326def Connect(
327    client: android_device.AndroidDevice, server: android_device.AndroidDevice
328) -> None:
329  """Logic for create a Gatt connection between a client and a server.
330
331  Steps:
332    1. Server starts and service added properly.
333    2. Client connects to server via Gatt, connection completes with
334    GATT_SUCCESS within TIMEOUT, onConnectionStateChange/STATE_CONNECTED is
335    called EXACTLY once.
336
337  Verifies:
338    Both the client and the server consider themselves connected to each other.
339
340  Args:
341    client: AndroidDevice. The device that behaves as GATT client.
342    server: AndroidDevice. The device that behaves as GATT server.
343  """
344  server.server_callback = server.mbs.bleStartServer([SERVICE])
345  start_server_result = server.server_callback.waitAndGet('onServiceAdded', 30)
346  asserts.assert_equal(start_server_result.data[STATUS], GATT_SUCCESS)
347  uuids = [
348      characteristic[UUID]
349      for characteristic in start_server_result.data['Service'][
350          'Characteristics'
351      ]
352  ]
353  for uuid in [
354      characteristic[UUID] for characteristic in SERVICE['Characteristics']
355  ]:
356    asserts.assert_true(uuid in uuids, 'Failed to find uuid %s.' % uuid)
357  server.log.info('BLE server started')
358  client.client_callback = client.mbs.bleConnectGatt(client.connect_to_address)
359  start_client_result = client.client_callback.waitAndGet(
360      'onConnectionStateChange', CONNECTION_TIMEOUT
361  )
362  extra_events = client.client_callback.getAll('onConnectionStateChange')
363  asserts.assert_false(
364      extra_events,
365      'Got unexpected onConnectionStateChange events: %s',
366      extra_events,
367  )
368  asserts.assert_equal(start_client_result.data[STATUS], GATT_SUCCESS)
369  asserts.assert_equal(start_client_result.data[STATE], 'STATE_CONNECTED')
370  client.log.info('BLE client connected')
371  # Verify that the server side also considers itself connected.
372  server_event = server.server_callback.waitAndGet('onConnectionStateChange')
373  asserts.assert_equal(server_event.data[STATUS], GATT_SUCCESS)
374  asserts.assert_equal(
375      server_event.data[STATE],
376      'STATE_CONNECTED',
377      'The server side does not consider itself connected, error!',
378  )
379  logging.info('Gatt connection complete.')
380  logging.info(
381      'Connection metrics: %d', start_client_result.data['gattConnectionTimeMs']
382  )
383
384
385def Disconnect(
386    client: android_device.AndroidDevice, server: android_device.AndroidDevice
387) -> None:
388  """Logic for stopping BLE client and server.
389
390  Steps:
391    1. Client calls disconnect, gets a callback with STATE_DISCONNECTED and
392    GATT_SUCCESS.
393    2. Server closes.
394
395  Verifies: Client gets corresponding callback.
396
397  Args:
398    client: AndroidDevice. The device that behaves as GATT client.
399    server: AndroidDevice. The device that behaves as GATT server.
400  """
401  client.mbs.bleDisconnect()
402  stop_client_result = client.client_callback.waitAndGet(
403      'onConnectionStateChange', 30
404  )
405  asserts.assert_equal(stop_client_result.data[STATUS], GATT_SUCCESS)
406  asserts.assert_equal(stop_client_result.data[STATE], 'STATE_DISCONNECTED')
407  client.log.info('BLE client disconnected')
408  server.mbs.bleStopServer()
409  server.log.info('BLE server stopped')
410
411
412def DiscoverServices(client: android_device.AndroidDevice) -> None:
413  """Logic for BLE services discovery.
414
415  Steps:
416    1. Client successfully completes service discovery & gets
417    onServicesDiscovered callback within some TIMEOUT, onServicesDiscovered/
418    GATT_SUCCESS is called EXACTLY once.
419    2. Client discovers the readable and writable characteristics.
420
421  Verifies:
422    Client gets corresponding callback.
423
424  Args:
425    client: AndroidDevice. The device that behaves as GATT client.
426  """
427  client.mbs.bleDiscoverServices()
428  time.sleep(CONNECTION_TIMEOUT)
429  discover_services_results = client.client_callback.getAll(
430      'onServiceDiscovered'
431  )
432  asserts.assert_equal(len(discover_services_results), 1)
433  service_discovered = False
434  asserts.assert_equal(discover_services_results[0].data[STATUS], GATT_SUCCESS)
435  for service in discover_services_results[0].data['Services']:
436    if service['UUID'] == TEST_BLE_SERVICE_UUID:
437      service_discovered = True
438      uuids = [
439          characteristic[UUID] for characteristic in service['Characteristics']
440      ]
441      for uuid in [
442          characteristic[UUID] for characteristic in SERVICE['Characteristics']
443      ]:
444        asserts.assert_true(uuid in uuids, 'Failed to find uuid %s.' % uuid)
445  asserts.assert_true(
446      service_discovered, 'Failed to discover the customize service'
447  )
448  client.log.info('BLE discover services finished')
449
450
451def ReadCharacteristic(client: android_device.AndroidDevice) -> None:
452  """Logic for BLE characteristic retrieval.
453
454  Steps:
455    1. Client reads a characteristic from server & gets true.
456    2. Server calls sendResponse & client gets onCharacteristicRead.
457
458  Verifies:
459    Client gets corresponding callback.
460
461  Args:
462    client: AndroidDevice. The device that behaves as GATT client.
463  """
464  read_operation_result = client.mbs.bleReadOperation(
465      TEST_BLE_SERVICE_UUID, TEST_READ_UUID
466  )
467  asserts.assert_true(
468      read_operation_result, 'BLE read operation failed to start'
469  )
470  read_operation_result = client.client_callback.waitAndGet(
471      'onCharacteristicRead', 30
472  )
473  asserts.assert_equal(read_operation_result.data[STATUS], GATT_SUCCESS)
474  asserts.assert_equal(read_operation_result.data['Data'], READ_DATA)
475  client.log.info('Read operation finished')
476  read_operation_result = client.mbs.bleReadOperation(
477      TEST_BLE_SERVICE_UUID, TEST_SECOND_READ_UUID
478  )
479  asserts.assert_true(
480      read_operation_result, 'BLE read operation failed to start'
481  )
482  read_operation_result = client.client_callback.waitAndGet(
483      'onCharacteristicRead', 30
484  )
485  asserts.assert_equal(read_operation_result.data[STATUS], GATT_SUCCESS)
486  asserts.assert_equal(read_operation_result.data['Data'], SECOND_READ_DATA)
487  client.log.info('Second read operation finished')
488  read_operation_result = client.mbs.bleReadOperation(
489      TEST_BLE_SERVICE_UUID, TEST_THIRD_READ_UUID
490  )
491  asserts.assert_true(
492      read_operation_result, 'BLE read operation failed to start'
493  )
494  read_operation_result = client.client_callback.waitAndGet(
495      'onCharacteristicRead', 30
496  )
497  asserts.assert_equal(read_operation_result.data[STATUS], GATT_SUCCESS)
498  asserts.assert_equal(read_operation_result.data['Data'], THIRD_READ_DATA)
499  client.log.info('Third read operation finished')
500
501
502def WriteCharacteristic(
503    client: android_device.AndroidDevice, server: android_device.AndroidDevice
504) -> None:
505  """Logic for BLE characteristic write.
506
507  Steps:
508    1. Client writes a characteristic to server & gets true.
509    2. Server calls sendResponse & client gets onCharacteristicWrite.
510
511  Verifies:
512    Client gets corresponding callback.
513
514  Args:
515    client: AndroidDevice. The device that behaves as GATT client.
516    server: AndroidDevice. The device that behaves as GATT server.
517  """
518  write_operation_result = client.mbs.bleWriteOperation(
519      TEST_BLE_SERVICE_UUID, TEST_WRITE_UUID, WRITE_DATA
520  )
521  asserts.assert_true(
522      write_operation_result, 'BLE write operation failed to start'
523  )
524  server_write_operation_result = server.server_callback.waitAndGet(
525      'onCharacteristicWriteRequest', 30
526  )
527  asserts.assert_equal(server_write_operation_result.data['Data'], WRITE_DATA)
528  client.client_callback.waitAndGet('onCharacteristicWrite', 30)
529  client.log.info('Write operation finished')
530  write_operation_result = client.mbs.bleWriteOperation(
531      TEST_BLE_SERVICE_UUID, TEST_SECOND_WRITE_UUID, SECOND_WRITE_DATA
532  )
533  asserts.assert_true(
534      write_operation_result, 'BLE write operation failed to start'
535  )
536  server_write_operation_result = server.server_callback.waitAndGet(
537      'onCharacteristicWriteRequest', 30
538  )
539  asserts.assert_equal(
540      server_write_operation_result.data['Data'], SECOND_WRITE_DATA
541  )
542  client.client_callback.waitAndGet('onCharacteristicWrite', 30)
543  client.log.info('Second write operation finished')
544