1#!/usr/bin/env python3 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import inspect 18import logging 19 20from queue import Empty 21 22from acts.controllers.android_device import AndroidDevice 23from acts.controllers.fuchsia_device import FuchsiaDevice 24from acts_contrib.test_utils.bt.bt_constants import ble_scan_settings_modes 25from acts_contrib.test_utils.bt.bt_constants import gatt_cb_strings 26from acts_contrib.test_utils.bt.bt_constants import gatt_event 27from acts_contrib.test_utils.bt.bt_constants import scan_result 28from acts_contrib.test_utils.bt.bt_gatt_utils import GattTestUtilsError 29from acts_contrib.test_utils.bt.bt_gatt_utils import disconnect_gatt_connection 30from acts_contrib.test_utils.bt.bt_gatt_utils import setup_gatt_connection 31from acts_contrib.test_utils.fuchsia.bt_test_utils import le_scan_for_device_by_name 32 33import acts_contrib.test_utils.bt.bt_test_utils as bt_test_utils 34 35 36def create_bluetooth_device(hardware_device): 37 """Creates a generic Bluetooth device based on type of device that is sent 38 to the functions. 39 40 Args: 41 hardware_device: A Bluetooth hardware device that is supported by ACTS. 42 """ 43 if isinstance(hardware_device, FuchsiaDevice): 44 return FuchsiaBluetoothDevice(hardware_device) 45 elif isinstance(hardware_device, AndroidDevice): 46 return AndroidBluetoothDevice(hardware_device) 47 else: 48 raise ValueError('Unable to create BluetoothDevice for type %s' % 49 type(hardware_device)) 50 51 52class BluetoothDevice(object): 53 """Class representing a generic Bluetooth device. 54 55 Each object of this class represents a generic Bluetooth device. 56 Android device and Fuchsia devices are the currently supported devices. 57 58 Attributes: 59 device: A generic Bluetooth device. 60 """ 61 62 def __init__(self, device): 63 self.device = device 64 self.log = logging 65 66 def a2dp_initiate_open_stream(self): 67 """Base generic Bluetooth interface. Only called if not overridden by 68 another supported device. 69 """ 70 raise NotImplementedError("{} must be defined.".format( 71 inspect.currentframe().f_code.co_name)) 72 73 def start_profile_a2dp_sink(self): 74 """Base generic Bluetooth interface. Only called if not overridden by 75 another supported device. 76 """ 77 raise NotImplementedError("{} must be defined.".format( 78 inspect.currentframe().f_code.co_name)) 79 80 def stop_profile_a2dp_sink(self): 81 """Base generic Bluetooth interface. Only called if not overridden by 82 another supported device. 83 """ 84 raise NotImplementedError("{} must be defined.".format( 85 inspect.currentframe().f_code.co_name)) 86 87 def start_pairing_helper(self): 88 """Base generic Bluetooth interface. Only called if not overridden by 89 another supported device. 90 """ 91 raise NotImplementedError("{} must be defined.".format( 92 inspect.currentframe().f_code.co_name)) 93 94 def set_discoverable(self, is_discoverable): 95 """Base generic Bluetooth interface. Only called if not overridden by 96 another supported device. 97 """ 98 raise NotImplementedError("{} must be defined.".format( 99 inspect.currentframe().f_code.co_name)) 100 101 def bluetooth_toggle_state(self, state): 102 """Base generic Bluetooth interface. Only called if not overridden by 103 another supported device. 104 """ 105 raise NotImplementedError("{} must be defined.".format( 106 inspect.currentframe().f_code.co_name)) 107 108 def gatt_client_discover_characteristic_by_uuid(self, peer_identifier, 109 uuid): 110 """Base generic Bluetooth interface. Only called if not overridden by 111 another supported device. 112 """ 113 raise NotImplementedError("{} must be defined.".format( 114 inspect.currentframe().f_code.co_name)) 115 116 def initialize_bluetooth_controller(self): 117 """Base generic Bluetooth interface. Only called if not overridden by 118 another supported device. 119 """ 120 raise NotImplementedError("{} must be defined.".format( 121 inspect.currentframe().f_code.co_name)) 122 123 def get_pairing_pin(self): 124 """Base generic Bluetooth interface. Only called if not overridden by 125 another supported device. 126 """ 127 raise NotImplementedError("{} must be defined.".format( 128 inspect.currentframe().f_code.co_name)) 129 130 def input_pairing_pin(self, pin): 131 """Base generic Bluetooth interface. Only called if not overridden by 132 another supported device. 133 """ 134 raise NotImplementedError("{} must be defined.".format( 135 inspect.currentframe().f_code.co_name)) 136 137 def get_bluetooth_local_address(self): 138 """Base generic Bluetooth interface. Only called if not overridden by 139 another supported device. 140 """ 141 raise NotImplementedError("{} must be defined.".format( 142 inspect.currentframe().f_code.co_name)) 143 144 def gatt_connect(self, peer_identifier, transport, autoconnect): 145 """Base generic Bluetooth interface. Only called if not overridden by 146 another supported device. 147 """ 148 raise NotImplementedError("{} must be defined.".format( 149 inspect.currentframe().f_code.co_name)) 150 151 def gatt_client_write_characteristic_without_response_by_handle( 152 self, peer_identifier, handle, value): 153 """Base generic Bluetooth interface. Only called if not overridden by 154 another supported device. 155 """ 156 raise NotImplementedError("{} must be defined.".format( 157 inspect.currentframe().f_code.co_name)) 158 159 def gatt_client_write_characteristic_by_handle(self, peer_identifier, 160 handle, offset, value): 161 """Base generic Bluetooth interface. Only called if not overridden by 162 another supported device. 163 """ 164 raise NotImplementedError("{} must be defined.".format( 165 inspect.currentframe().f_code.co_name)) 166 167 def gatt_client_read_characteristic_by_handle(self, peer_identifier, 168 handle): 169 """Base generic Bluetooth interface. Only called if not overridden by 170 another supported device. 171 """ 172 raise NotImplementedError("{} must be defined.".format( 173 inspect.currentframe().f_code.co_name)) 174 175 def gatt_client_read_characteristic_by_uuid(self, peer_identifier, uuid): 176 """Base generic Bluetooth interface. Only called if not overridden by 177 another supported device. 178 """ 179 raise NotImplementedError("{} must be defined.".format( 180 inspect.currentframe().f_code.co_name)) 181 182 def gatt_client_read_long_characteristic_by_handle(self, peer_identifier, 183 handle, offset, 184 max_bytes): 185 """Base generic Bluetooth interface. Only called if not overridden by 186 another supported device. 187 """ 188 raise NotImplementedError("{} must be defined.".format( 189 inspect.currentframe().f_code.co_name)) 190 191 def gatt_client_enable_notifiy_characteristic_by_handle( 192 self, peer_identifier, handle): 193 """Base generic Bluetooth interface. Only called if not overridden by 194 another supported device. 195 """ 196 raise NotImplementedError("{} must be defined.".format( 197 inspect.currentframe().f_code.co_name)) 198 199 def gatt_client_disable_notifiy_characteristic_by_handle( 200 self, peer_identifier, handle): 201 """Base generic Bluetooth interface. Only called if not overridden by 202 another supported device. 203 """ 204 raise NotImplementedError("{} must be defined.".format( 205 inspect.currentframe().f_code.co_name)) 206 207 def gatt_client_read_descriptor_by_handle(self, peer_identifier, handle): 208 """Base generic Bluetooth interface. Only called if not overridden by 209 another supported device. 210 """ 211 raise NotImplementedError("{} must be defined.".format( 212 inspect.currentframe().f_code.co_name)) 213 214 def gatt_client_write_descriptor_by_handle(self, peer_identifier, handle, 215 offset, value): 216 """Base generic Bluetooth interface. Only called if not overridden by 217 another supported device. 218 """ 219 raise NotImplementedError("{} must be defined.".format( 220 inspect.currentframe().f_code.co_name)) 221 222 def gatt_client_long_read_descriptor_by_handle(self, peer_identifier, 223 handle, offset, max_bytes): 224 """Base generic Bluetooth interface. Only called if not overridden by 225 another supported device. 226 """ 227 raise NotImplementedError("{} must be defined.".format( 228 inspect.currentframe().f_code.co_name)) 229 230 def gatt_disconnect(self, peer_identifier): 231 """Base generic Bluetooth interface. Only called if not overridden by 232 another supported device. 233 """ 234 raise NotImplementedError("{} must be defined.".format( 235 inspect.currentframe().f_code.co_name)) 236 237 def gatt_client_refresh(self, peer_identifier): 238 """Base generic Bluetooth interface. Only called if not overridden by 239 another supported device. 240 """ 241 raise NotImplementedError("{} must be defined.".format( 242 inspect.currentframe().f_code.co_name)) 243 244 def le_scan_with_name_filter(self, name, timeout): 245 """Base generic Bluetooth interface. Only called if not overridden by 246 another supported device. 247 """ 248 raise NotImplementedError("{} must be defined.".format( 249 inspect.currentframe().f_code.co_name)) 250 251 def log_info(self, log): 252 """Base generic Bluetooth interface. Only called if not overridden by 253 another supported device. 254 """ 255 raise NotImplementedError("{} must be defined.".format( 256 inspect.currentframe().f_code.co_name)) 257 258 def reset_bluetooth(self): 259 """Base generic Bluetooth interface. Only called if not overridden by 260 another supported device. 261 """ 262 raise NotImplementedError("{} must be defined.".format( 263 inspect.currentframe().f_code.co_name)) 264 265 def sdp_add_search(self, attribute_list, profile_id): 266 """Base generic Bluetooth interface. Only called if not overridden by 267 another supported device. 268 """ 269 raise NotImplementedError("{} must be defined.".format( 270 inspect.currentframe().f_code.co_name)) 271 272 def sdp_add_service(self, sdp_record): 273 """Base generic Bluetooth interface. Only called if not overridden by 274 another supported device. 275 """ 276 raise NotImplementedError("{} must be defined.".format( 277 inspect.currentframe().f_code.co_name)) 278 279 def sdp_clean_up(self): 280 """Base generic Bluetooth interface. Only called if not overridden by 281 another supported device. 282 """ 283 raise NotImplementedError("{} must be defined.".format( 284 inspect.currentframe().f_code.co_name)) 285 286 def sdp_init(self): 287 """Base generic Bluetooth interface. Only called if not overridden by 288 another supported device. 289 """ 290 raise NotImplementedError("{} must be defined.".format( 291 inspect.currentframe().f_code.co_name)) 292 293 def sdp_remove_service(self, service_id): 294 """Base generic Bluetooth interface. Only called if not overridden by 295 another supported device. 296 """ 297 raise NotImplementedError("{} must be defined.".format( 298 inspect.currentframe().f_code.co_name)) 299 300 def start_le_advertisement(self, adv_data, scan_response, adv_interval, 301 connectable): 302 """Base generic Bluetooth interface. Only called if not overridden by 303 another supported device. 304 """ 305 raise NotImplementedError("{} must be defined.".format( 306 inspect.currentframe().f_code.co_name)) 307 308 def stop_le_advertisement(self): 309 """Base generic Bluetooth interface. Only called if not overridden by 310 another supported device. 311 """ 312 raise NotImplementedError("{} must be defined.".format( 313 inspect.currentframe().f_code.co_name)) 314 315 def set_bluetooth_local_name(self, name): 316 """Base generic Bluetooth interface. Only called if not overridden by 317 another supported device. 318 """ 319 raise NotImplementedError("{} must be defined.".format( 320 inspect.currentframe().f_code.co_name)) 321 322 def setup_gatt_server(self, database): 323 """Base generic Bluetooth interface. Only called if not overridden by 324 another supported device. 325 """ 326 raise NotImplementedError("{} must be defined.".format( 327 inspect.currentframe().f_code.co_name)) 328 329 def close_gatt_server(self): 330 """Base generic Bluetooth interface. Only called if not overridden by 331 another supported device. 332 """ 333 raise NotImplementedError("{} must be defined.".format( 334 inspect.currentframe().f_code.co_name)) 335 336 def unbond_device(self, peer_identifier): 337 """Base generic Bluetooth interface. Only called if not overridden by 338 another supported device. 339 """ 340 raise NotImplementedError("{} must be defined.".format( 341 inspect.currentframe().f_code.co_name)) 342 343 def unbond_all_known_devices(self): 344 """Base generic Bluetooth interface. Only called if not overridden by 345 another supported device. 346 """ 347 raise NotImplementedError("{} must be defined.".format( 348 inspect.currentframe().f_code.co_name)) 349 350 def init_pair(self, peer_identifier, security_level, non_bondable, 351 transport): 352 """Base generic Bluetooth interface. Only called if not overridden by 353 another supported device. 354 """ 355 raise NotImplementedError("{} must be defined.".format( 356 inspect.currentframe().f_code.co_name)) 357 358 359class AndroidBluetoothDevice(BluetoothDevice): 360 """Class wrapper for an Android Bluetooth device. 361 362 Each object of this class represents a generic Bluetooth device. 363 Android device and Fuchsia devices are the currently supported devices/ 364 365 Attributes: 366 android_device: An Android Bluetooth device. 367 """ 368 369 def __init__(self, android_device): 370 super().__init__(android_device) 371 self.gatt_timeout = 10 372 self.peer_mapping = {} 373 self.discovered_services_index = None 374 375 def _client_wait(self, gatt_event, gatt_callback): 376 return self._timed_pop(gatt_event, gatt_callback) 377 378 def _timed_pop(self, gatt_event, gatt_callback): 379 expected_event = gatt_event["evt"].format(gatt_callback) 380 try: 381 return self.device.ed.pop_event(expected_event, self.gatt_timeout) 382 except Empty as emp: 383 raise AssertionError(gatt_event["err"].format(expected_event)) 384 385 def _setup_discovered_services_index(self, bluetooth_gatt): 386 """ Sets the discovered services index for the gatt connection 387 related to the Bluetooth GATT callback object. 388 389 Args: 390 bluetooth_gatt: The BluetoothGatt callback id 391 """ 392 if not self.discovered_services_index: 393 self.device.droid.gattClientDiscoverServices(bluetooth_gatt) 394 expected_event = gatt_cb_strings['gatt_serv_disc'].format( 395 self.gatt_callback) 396 event = self.dut.ed.pop_event(expected_event, self.gatt_timeout) 397 self.discovered_services_index = event['data']['ServicesIndex'] 398 399 def a2dp_initiate_open_stream(self): 400 raise NotImplementedError("{} not yet implemented.".format( 401 inspect.currentframe().f_code.co_name)) 402 403 def start_profile_a2dp_sink(self): 404 raise NotImplementedError("{} not yet implemented.".format( 405 inspect.currentframe().f_code.co_name)) 406 407 def stop_profile_a2dp_sink(self): 408 raise NotImplementedError("{} not yet implemented.".format( 409 inspect.currentframe().f_code.co_name)) 410 411 def bluetooth_toggle_state(self, state): 412 self.device.droid.bluetoothToggleState(state) 413 414 def set_discoverable(self, is_discoverable): 415 """ Sets the device's discoverability. 416 417 Args: 418 is_discoverable: True if discoverable, false if not discoverable 419 """ 420 if is_discoverable: 421 self.device.droid.bluetoothMakeDiscoverable() 422 else: 423 self.device.droid.bluetoothMakeUndiscoverable() 424 425 def initialize_bluetooth_controller(self): 426 """ Just pass for Android as there is no concept of initializing 427 a Bluetooth controller. 428 """ 429 430 def start_pairing_helper(self): 431 """ Starts the Android pairing helper. 432 """ 433 self.device.droid.bluetoothStartPairingHelper(True) 434 435 def gatt_client_write_characteristic_without_response_by_handle( 436 self, peer_identifier, handle, value): 437 """ Perform a GATT Client write Characteristic without response to 438 remote peer GATT server database. 439 440 Args: 441 peer_identifier: The mac address associated with the GATT connection 442 handle: The characteristic handle (or instance id). 443 value: The list of bytes to write. 444 Returns: 445 True if success, False if failure. 446 """ 447 peer_info = self.peer_mapping.get(peer_identifier) 448 if not peer_info: 449 self.log.error( 450 "Peer idenifier {} not currently connected or unknown.".format( 451 peer_identifier)) 452 return False 453 self._setup_discovered_services_index() 454 self.device.droid.gattClientWriteCharacteristicByInstanceId( 455 peer_info.get('bluetooth_gatt'), self.discovered_services_index, 456 handle, value) 457 try: 458 event = self._client_wait(gatt_event['char_write'], 459 peer_info.get('gatt_callback')) 460 except AssertionError as err: 461 self.log.error("Failed to write Characteristic: {}".format(err)) 462 return True 463 464 def gatt_client_write_characteristic_by_handle(self, peer_identifier, 465 handle, offset, value): 466 """ Perform a GATT Client write Characteristic without response to 467 remote peer GATT server database. 468 469 Args: 470 peer_identifier: The mac address associated with the GATT connection 471 handle: The characteristic handle (or instance id). 472 offset: Not used yet. 473 value: The list of bytes to write. 474 Returns: 475 True if success, False if failure. 476 """ 477 peer_info = self.peer_mapping.get(peer_identifier) 478 if not peer_info: 479 self.log.error( 480 "Peer idenifier {} not currently connected or unknown.".format( 481 peer_identifier)) 482 return False 483 self._setup_discovered_services_index() 484 self.device.droid.gattClientWriteCharacteristicByInstanceId( 485 peer_info.get('bluetooth_gatt'), self.discovered_services_index, 486 handle, value) 487 try: 488 event = self._client_wait(gatt_event['char_write'], 489 peer_info.get('gatt_callback')) 490 except AssertionError as err: 491 self.log.error("Failed to write Characteristic: {}".format(err)) 492 return True 493 494 def gatt_client_read_characteristic_by_handle(self, peer_identifier, 495 handle): 496 """ Perform a GATT Client read Characteristic to remote peer GATT 497 server database. 498 499 Args: 500 peer_identifier: The mac address associated with the GATT connection 501 handle: The characteristic handle (or instance id). 502 Returns: 503 Value of Characteristic if success, None if failure. 504 """ 505 peer_info = self.peer_mapping.get(peer_identifier) 506 if not peer_info: 507 self.log.error( 508 "Peer idenifier {} not currently connected or unknown.".format( 509 peer_identifier)) 510 return False 511 self._setup_discovered_services_index() 512 self.dut.droid.gattClientReadCharacteristicByInstanceId( 513 peer_info.get('bluetooth_gatt'), self.discovered_services_index, 514 handle) 515 try: 516 event = self._client_wait(gatt_event['char_read'], 517 peer_info.get('gatt_callback')) 518 except AssertionError as err: 519 self.log.error("Failed to read Characteristic: {}".format(err)) 520 521 return event['data']['CharacteristicValue'] 522 523 def gatt_client_read_long_characteristic_by_handle(self, peer_identifier, 524 handle, offset, 525 max_bytes): 526 """ Perform a GATT Client read Characteristic to remote peer GATT 527 server database. 528 529 Args: 530 peer_identifier: The mac address associated with the GATT connection 531 offset: Not used yet. 532 handle: The characteristic handle (or instance id). 533 max_bytes: Not used yet. 534 Returns: 535 Value of Characteristic if success, None if failure. 536 """ 537 peer_info = self.peer_mapping.get(peer_identifier) 538 if not peer_info: 539 self.log.error( 540 "Peer idenifier {} not currently connected or unknown.".format( 541 peer_identifier)) 542 return False 543 self._setup_discovered_services_index() 544 self.dut.droid.gattClientReadCharacteristicByInstanceId( 545 peer_info.get('bluetooth_gatt'), self.discovered_services_index, 546 handle) 547 try: 548 event = self._client_wait(gatt_event['char_read'], 549 peer_info.get('gatt_callback')) 550 except AssertionError as err: 551 self.log.error("Failed to read Characteristic: {}".format(err)) 552 553 return event['data']['CharacteristicValue'] 554 555 def gatt_client_enable_notifiy_characteristic_by_handle( 556 self, peer_identifier, handle): 557 """ Perform a GATT Client enable Characteristic notification to remote 558 peer GATT server database. 559 560 Args: 561 peer_identifier: The mac address associated with the GATT connection 562 handle: The characteristic handle. 563 Returns: 564 True is success, False if failure. 565 """ 566 raise NotImplementedError("{} not yet implemented.".format( 567 inspect.currentframe().f_code.co_name)) 568 569 def gatt_client_disable_notifiy_characteristic_by_handle( 570 self, peer_identifier, handle): 571 """ Perform a GATT Client disable Characteristic notification to remote 572 peer GATT server database. 573 574 Args: 575 peer_identifier: The mac address associated with the GATT connection 576 handle: The characteristic handle. 577 Returns: 578 True is success, False if failure. 579 """ 580 raise NotImplementedError("{} not yet implemented.".format( 581 inspect.currentframe().f_code.co_name)) 582 583 def gatt_client_read_descriptor_by_handle(self, peer_identifier, handle): 584 """ Perform a GATT Client read Descriptor to remote peer GATT 585 server database. 586 587 Args: 588 peer_identifier: The mac address associated with the GATT connection 589 handle: The Descriptor handle (or instance id). 590 Returns: 591 Value of Descriptor if success, None if failure. 592 """ 593 peer_info = self.peer_mapping.get(peer_identifier) 594 if not peer_info: 595 self.log.error( 596 "Peer idenifier {} not currently connected or unknown.".format( 597 peer_identifier)) 598 return False 599 self._setup_discovered_services_index() 600 self.dut.droid.gattClientReadDescriptorByInstanceId( 601 peer_info.get('bluetooth_gatt'), self.discovered_services_index, 602 handle) 603 try: 604 event = self._client_wait(gatt_event['desc_read'], 605 peer_info.get('gatt_callback')) 606 except AssertionError as err: 607 self.log.error("Failed to read Descriptor: {}".format(err)) 608 # TODO: Implement sending Descriptor value in SL4A such that the data 609 # can be represented by: event['data']['DescriptorValue'] 610 return "" 611 612 def gatt_client_write_descriptor_by_handle(self, peer_identifier, handle, 613 offset, value): 614 """ Perform a GATT Client write Descriptor to the remote peer GATT 615 server database. 616 617 Args: 618 peer_identifier: The mac address associated with the GATT connection 619 handle: The Descriptor handle (or instance id). 620 offset: Not used yet 621 value: The list of bytes to write. 622 Returns: 623 True if success, False if failure. 624 """ 625 peer_info = self.peer_mapping.get(peer_identifier) 626 if not peer_info: 627 self.log.error( 628 "Peer idenifier {} not currently connected or unknown.".format( 629 peer_identifier)) 630 return False 631 self._setup_discovered_services_index() 632 self.device.droid.gattClientWriteDescriptorByInstanceId( 633 peer_info.get('bluetooth_gatt'), self.discovered_services_index, 634 handle, value) 635 try: 636 event = self._client_wait(gatt_event['desc_write'], 637 peer_info.get('gatt_callback')) 638 except AssertionError as err: 639 self.log.error("Failed to write Characteristic: {}".format(err)) 640 return True 641 642 def gatt_connect(self, peer_identifier, transport, autoconnect=False): 643 """ Perform a GATT connection to a perihperal. 644 645 Args: 646 peer_identifier: The mac address to connect to. 647 transport: Which transport to use. 648 autoconnect: Set autocnnect to True or False. 649 Returns: 650 True if success, False if failure. 651 """ 652 try: 653 bluetooth_gatt, gatt_callback = setup_gatt_connection( 654 self.device, peer_identifier, autoconnect, transport) 655 self.peer_mapping[peer_identifier] = { 656 "bluetooth_gatt": bluetooth_gatt, 657 "gatt_callback": gatt_callback 658 } 659 except GattTestUtilsError as err: 660 self.log.error(err) 661 return False 662 return True 663 664 def gatt_disconnect(self, peer_identifier): 665 """ Perform a GATT disconnect from a perihperal. 666 667 Args: 668 peer_identifier: The peer to disconnect from. 669 Returns: 670 True if success, False if failure. 671 """ 672 peer_info = self.peer_mapping.get(peer_identifier) 673 if not peer_info: 674 self.log.error( 675 "No previous connections made to {}".format(peer_identifier)) 676 return False 677 678 try: 679 disconnect_gatt_connection(self.device, 680 peer_info.get("bluetooth_gatt"), 681 peer_info.get("gatt_callback")) 682 self.device.droid.gattClientClose(peer_info.get("bluetooth_gatt")) 683 except GattTestUtilsError as err: 684 self.log.error(err) 685 return False 686 self.device.droid.gattClientClose(peer_info.get("bluetooth_gatt")) 687 688 def gatt_client_refresh(self, peer_identifier): 689 """ Perform a GATT Client Refresh of a perihperal. 690 691 Clears the internal cache and forces a refresh of the services from the 692 remote device. 693 694 Args: 695 peer_identifier: The peer to refresh. 696 """ 697 peer_info = self.peer_mapping.get(peer_identifier) 698 if not peer_info: 699 self.log.error( 700 "No previous connections made to {}".format(peer_identifier)) 701 return False 702 self.device.droid.gattClientRefresh(peer_info["bluetooth_gatt"]) 703 704 def le_scan_with_name_filter(self, name, timeout): 705 """ Scan over LE for a specific device name. 706 707 Args: 708 name: The name filter to set. 709 timeout: The timeout to wait to find the advertisement. 710 Returns: 711 Discovered mac address or None 712 """ 713 self.device.droid.bleSetScanSettingsScanMode( 714 ble_scan_settings_modes['low_latency']) 715 filter_list = self.device.droid.bleGenFilterList() 716 scan_settings = self.device.droid.bleBuildScanSetting() 717 scan_callback = self.device.droid.bleGenScanCallback() 718 self.device.droid.bleSetScanFilterDeviceName(name) 719 self.device.droid.bleBuildScanFilter(filter_list) 720 self.device.droid.bleSetScanFilterDeviceName(self.name) 721 self.device.droid.bleStartBleScan(filter_list, scan_settings, 722 scan_callback) 723 try: 724 event = self.device.ed.pop_event(scan_result.format(scan_callback), 725 timeout) 726 return event['data']['Result']['deviceInfo']['address'] 727 except Empty as err: 728 self.log.info("Scanner did not find advertisement {}".format(err)) 729 return None 730 731 def log_info(self, log): 732 """ Log directly onto the device. 733 734 Args: 735 log: The informative log. 736 """ 737 self.device.droid.log.logI(log) 738 739 def set_bluetooth_local_name(self, name): 740 """ Sets the Bluetooth controller's local name 741 Args: 742 name: The name to set. 743 """ 744 self.device.droid.bluetoothSetLocalName(name) 745 746 def get_local_bluetooth_address(self): 747 """ Returns the Bluetooth local address. 748 """ 749 return self.device.droid.bluetoothGetLocalAddress() 750 751 def reset_bluetooth(self): 752 """ Resets Bluetooth on the Android Device. 753 """ 754 bt_test_utils.reset_bluetooth([self.device]) 755 756 def sdp_add_search(self, attribute_list, profile_id): 757 """Adds an SDP search record. 758 Args: 759 attribute_list: The list of attributes to set 760 profile_id: The profile ID to set. 761 """ 762 # Android devices currently have no hooks to modify the SDP record. 763 764 def sdp_add_service(self, sdp_record): 765 """Adds an SDP service record. 766 Args: 767 sdp_record: The dictionary representing the search record to add. 768 Returns: 769 service_id: The service id to track the service record published. 770 None if failed. 771 """ 772 # Android devices currently have no hooks to modify the SDP record. 773 774 def sdp_clean_up(self): 775 """Cleans up all objects related to SDP. 776 """ 777 self.device.sl4f.sdp_lib.cleanUp() 778 779 def sdp_init(self): 780 """Initializes SDP on the device. 781 """ 782 # Android devices currently have no hooks to modify the SDP record. 783 784 def sdp_remove_service(self, service_id): 785 """Removes a service based on an input id. 786 Args: 787 service_id: The service ID to remove. 788 """ 789 # Android devices currently have no hooks to modify the SDP record. 790 791 def unbond_all_known_devices(self): 792 """ Unbond all known remote devices. 793 """ 794 self.device.droid.bluetoothFactoryReset() 795 796 def unbond_device(self, peer_identifier): 797 """ Unbond peer identifier. 798 799 Args: 800 peer_identifier: The mac address for the peer to unbond. 801 802 """ 803 self.device.droid.bluetoothUnbond(peer_identifier) 804 805 def init_pair(self, peer_identifier, security_level, non_bondable, 806 transport): 807 """ Send an outgoing pairing request the input peer_identifier. 808 809 Android currently does not support setting various security levels or 810 bondable modes. Making them available for other bluetooth_device 811 variants. Depending on the Address type, Android will figure out the 812 transport to pair automatically. 813 814 Args: 815 peer_identifier: A string representing the device id. 816 security_level: Not yet implemented. See Fuchsia device impl. 817 non_bondable: Not yet implemented. See Fuchsia device impl. 818 transport: Not yet implemented. See Fuchsia device impl. 819 820 """ 821 self.dut.droid.bluetoothBond(self.peer_identifier) 822 823 824class FuchsiaBluetoothDevice(BluetoothDevice): 825 """Class wrapper for an Fuchsia Bluetooth device. 826 827 Each object of this class represents a generic luetooth device. 828 Android device and Fuchsia devices are the currently supported devices/ 829 830 Attributes: 831 fuchsia_device: A Fuchsia Bluetooth device. 832 """ 833 834 def __init__(self, fuchsia_device): 835 super().__init__(fuchsia_device) 836 837 def a2dp_initiate_open_stream(self): 838 raise NotImplementedError("{} not yet implemented.".format( 839 inspect.currentframe().f_code.co_name)) 840 841 def start_profile_a2dp_sink(self): 842 """ Starts the A2DP sink profile. 843 """ 844 self.device.start_v1_component("bt-a2dp-sink") 845 846 def stop_profile_a2dp_sink(self): 847 """ Stops the A2DP sink profile. 848 """ 849 self.device.stop_v1_component("bt-a2dp-sink") 850 851 def start_pairing_helper(self): 852 self.device.sl4f.bts_lib.acceptPairing() 853 854 def bluetooth_toggle_state(self, state): 855 """Stub for Fuchsia implementation.""" 856 857 def set_discoverable(self, is_discoverable): 858 """ Sets the device's discoverability. 859 860 Args: 861 is_discoverable: True if discoverable, false if not discoverable 862 """ 863 self.device.sl4f.bts_lib.setDiscoverable(is_discoverable) 864 865 def get_pairing_pin(self): 866 """ Get the pairing pin from the active pairing delegate. 867 """ 868 return self.device.sl4f.bts_lib.getPairingPin()['result'] 869 870 def input_pairing_pin(self, pin): 871 """ Input pairing pin to active pairing delegate. 872 873 Args: 874 pin: The pin to input. 875 """ 876 self.device.sl4f.bts_lib.inputPairingPin(pin) 877 878 def initialize_bluetooth_controller(self): 879 """ Initialize Bluetooth controller for first time use. 880 """ 881 self.device.sl4f.bts_lib.initBluetoothSys() 882 883 def get_local_bluetooth_address(self): 884 """ Returns the Bluetooth local address. 885 """ 886 return self.device.sl4f.bts_lib.getActiveAdapterAddress().get("result") 887 888 def set_bluetooth_local_name(self, name): 889 """ Sets the Bluetooth controller's local name 890 Args: 891 name: The name to set. 892 """ 893 self.device.sl4f.bts_lib.setName(name) 894 895 def gatt_client_write_characteristic_without_response_by_handle( 896 self, peer_identifier, handle, value): 897 """ Perform a GATT Client write Characteristic without response to 898 remote peer GATT server database. 899 900 Args: 901 peer_identifier: The peer to connect to. 902 handle: The characteristic handle. 903 value: The list of bytes to write. 904 Returns: 905 True if success, False if failure. 906 """ 907 if (not self._find_service_id_and_connect_to_service_for_handle( 908 peer_identifier, handle)): 909 self.log.warn( 910 "Unable to find handle {} in GATT server db.".format(handle)) 911 result = self.device.sl4f.gattc_lib.writeCharByIdWithoutResponse( 912 handle, value) 913 if result.get("error") is not None: 914 self.log.error( 915 "Failed to write characteristic handle {} with err: {}".format( 916 handle, result.get("error"))) 917 return False 918 return True 919 920 def gatt_client_write_characteristic_by_handle(self, peer_identifier, 921 handle, offset, value): 922 """ Perform a GATT Client write Characteristic to remote peer GATT 923 server database. 924 925 Args: 926 peer_identifier: The peer to connect to. 927 handle: The characteristic handle. 928 offset: The offset to start writing to. 929 value: The list of bytes to write. 930 Returns: 931 True if success, False if failure. 932 """ 933 if (not self._find_service_id_and_connect_to_service_for_handle( 934 peer_identifier, handle)): 935 self.log.warn( 936 "Unable to find handle {} in GATT server db.".format(handle)) 937 result = self.device.sl4f.gattc_lib.writeCharById( 938 handle, offset, value) 939 if result.get("error") is not None: 940 self.log.error( 941 "Failed to write characteristic handle {} with err: {}".format( 942 handle, result.get("error"))) 943 return False 944 return True 945 946 def gatt_client_write_long_characteristic_by_handle( 947 self, peer_identifier, handle, offset, value, reliable_mode=False): 948 """ Perform a GATT Client write long Characteristic to remote peer GATT 949 server database. 950 951 Args: 952 peer_identifier: The peer to connect to. 953 handle: The characteristic handle. 954 offset: The offset to start writing to. 955 value: The list of bytes to write. 956 reliable_mode: A bool value representing a reliable write or not. 957 Returns: 958 True if success, False if failure. 959 """ 960 if (not self._find_service_id_and_connect_to_service_for_handle( 961 peer_identifier, handle)): 962 self.log.error( 963 "Unable to find handle {} in GATT server db.".format(handle)) 964 return False 965 result = self.device.sl4f.gattc_lib.writeLongCharById( 966 handle, offset, value, reliable_mode) 967 if result.get("error") is not None: 968 self.log.error( 969 "Failed to write long characteristic handle {} with err: {}". 970 format(peer_identifier, result.get("error"))) 971 return False 972 return True 973 974 def gatt_client_write_long_descriptor_by_handle(self, peer_identifier, 975 handle, offset, value): 976 """ Perform a GATT Client write long Descriptor to remote peer GATT 977 server database. 978 979 Args: 980 peer_identifier: The peer to connect to. 981 handle: The descriptor handle. 982 offset: The offset to start writing to. 983 value: The list of bytes to write. 984 Returns: 985 True if success, False if failure. 986 """ 987 if (not self._find_service_id_and_connect_to_service_for_handle( 988 peer_identifier, handle)): 989 self.log.error( 990 "Unable to find handle {} in GATT server db.".format(handle)) 991 return False 992 result = self.device.sl4f.gattc_lib.writeLongDescById( 993 handle, offset, value) 994 if result.get("error") is not None: 995 self.log.error( 996 "Failed to write long descriptor handle {} with err: {}". 997 format(peer_identifier, result.get("error"))) 998 return False 999 return True 1000 1001 def gatt_client_read_characteristic_by_handle(self, peer_identifier, 1002 handle): 1003 """ Perform a GATT Client read Characteristic to remote peer GATT 1004 server database. 1005 1006 Args: 1007 peer_identifier: The peer to connect to. 1008 handle: The characteristic handle. 1009 Returns: 1010 Value of Characteristic if success, None if failure. 1011 """ 1012 if (not self._find_service_id_and_connect_to_service_for_handle( 1013 peer_identifier, handle)): 1014 self.log.warn( 1015 "Unable to find handle {} in GATT server db.".format(handle)) 1016 result = self.device.sl4f.gattc_lib.readCharacteristicById(handle) 1017 if result.get("error") is not None: 1018 self.log.error( 1019 "Failed to read characteristic handle {} with err: {}".format( 1020 handle, result.get("error"))) 1021 return None 1022 return result.get("result") 1023 1024 def gatt_client_read_characteristic_by_uuid(self, peer_identifier, uuid): 1025 """ Perform a GATT Client read Characteristic by uuid to remote peer GATT 1026 server database. 1027 1028 Args: 1029 peer_identifier: The peer to connect to. 1030 uuid: The characteristic uuid. 1031 Returns: 1032 Value of Characteristic if success, None if failure. 1033 """ 1034 if (not self._find_service_id_and_connect_to_service_for_handle( 1035 peer_identifier, uuid, uuid=True)): 1036 self.log.warn( 1037 "Unable to find uuid {} in GATT server db.".format(uuid)) 1038 result = self.device.sl4f.gattc_lib.readCharacteristicByType(uuid) 1039 if result.get("error") is not None: 1040 self.log.error( 1041 "Failed to read characteristic uuid {} with err: {}".format( 1042 uuid, result.get("error"))) 1043 return None 1044 return result.get("result") 1045 1046 def gatt_client_read_long_characteristic_by_handle(self, peer_identifier, 1047 handle, offset, 1048 max_bytes): 1049 """ Perform a GATT Client read Characteristic to remote peer GATT 1050 server database. 1051 1052 Args: 1053 peer_identifier: The peer to connect to. 1054 handle: The characteristic handle. 1055 offset: The offset to start reading. 1056 max_bytes: The max bytes to return for each read. 1057 Returns: 1058 Value of Characteristic if success, None if failure. 1059 """ 1060 if (not self._find_service_id_and_connect_to_service_for_handle( 1061 peer_identifier, handle)): 1062 self.log.warn( 1063 "Unable to find handle {} in GATT server db.".format(handle)) 1064 result = self.device.sl4f.gattc_lib.readLongCharacteristicById( 1065 handle, offset, max_bytes) 1066 if result.get("error") is not None: 1067 self.log.error( 1068 "Failed to read characteristic handle {} with err: {}".format( 1069 handle, result.get("error"))) 1070 return None 1071 return result.get("result") 1072 1073 def gatt_client_enable_notifiy_characteristic_by_handle( 1074 self, peer_identifier, handle): 1075 """ Perform a GATT Client enable Characteristic notification to remote 1076 peer GATT server database. 1077 1078 Args: 1079 peer_identifier: The peer to connect to. 1080 handle: The characteristic handle. 1081 Returns: 1082 True is success, False if failure. 1083 """ 1084 if (not self._find_service_id_and_connect_to_service_for_handle( 1085 peer_identifier, handle)): 1086 self.log.warn( 1087 "Unable to find handle {} in GATT server db.".format(handle)) 1088 result = self.device.sl4f.gattc_lib.enableNotifyCharacteristic(handle) 1089 if result.get("error") is not None: 1090 self.log.error( 1091 "Failed to enable characteristic notifications for handle {} " 1092 "with err: {}".format(handle, result.get("error"))) 1093 return None 1094 return result.get("result") 1095 1096 def gatt_client_disable_notifiy_characteristic_by_handle( 1097 self, peer_identifier, handle): 1098 """ Perform a GATT Client disable Characteristic notification to remote 1099 peer GATT server database. 1100 1101 Args: 1102 peer_identifier: The peer to connect to. 1103 handle: The characteristic handle. 1104 Returns: 1105 True is success, False if failure. 1106 """ 1107 if (not self._find_service_id_and_connect_to_service_for_handle( 1108 peer_identifier, handle)): 1109 self.log.warn( 1110 "Unable to find handle {} in GATT server db.".format(handle)) 1111 result = self.device.sl4f.gattc_lib.disableNotifyCharacteristic(handle) 1112 if result.get("error") is not None: 1113 self.log.error( 1114 "Failed to disable characteristic notifications for handle {} " 1115 "with err: {}".format(peer_identifier, result.get("error"))) 1116 return None 1117 return result.get("result") 1118 1119 def gatt_client_read_descriptor_by_handle(self, peer_identifier, handle): 1120 """ Perform a GATT Client read Descriptor to remote peer GATT server 1121 database. 1122 1123 Args: 1124 peer_identifier: The peer to connect to. 1125 handle: The Descriptor handle. 1126 Returns: 1127 Value of Descriptor if success, None if failure. 1128 """ 1129 if (not self._find_service_id_and_connect_to_service_for_handle( 1130 peer_identifier, handle)): 1131 self.log.warn( 1132 "Unable to find handle {} in GATT server db.".format(handle)) 1133 result = self.device.sl4f.gattc_lib.readDescriptorById(handle) 1134 if result.get("error") is not None: 1135 self.log.error( 1136 "Failed to read descriptor for handle {} with err: {}".format( 1137 peer_identifier, result.get("error"))) 1138 return None 1139 return result.get("result") 1140 1141 def gatt_client_write_descriptor_by_handle(self, peer_identifier, handle, 1142 offset, value): 1143 """ Perform a GATT Client write Descriptor to remote peer GATT server 1144 database. 1145 1146 Args: 1147 peer_identifier: The peer to connect to. 1148 handle: The Descriptor handle. 1149 offset: The offset to start writing at. 1150 value: The list of bytes to write. 1151 Returns: 1152 True if success, False if failure. 1153 """ 1154 if (not self._find_service_id_and_connect_to_service_for_handle( 1155 peer_identifier, handle)): 1156 self.log.warn( 1157 "Unable to find handle {} in GATT server db.".format(handle)) 1158 result = self.device.sl4f.gattc_lib.writeDescriptorById( 1159 handle, offset, value) 1160 if result.get("error") is not None: 1161 self.log.error( 1162 "Failed to write descriptor for handle {} with err: {}".format( 1163 peer_identifier, result.get("error"))) 1164 return None 1165 return True 1166 1167 def gatt_connect(self, peer_identifier, transport, autoconnect): 1168 """ Perform a GATT connection to a perihperal. 1169 1170 Args: 1171 peer_identifier: The peer to connect to. 1172 transport: Not implemented. 1173 autoconnect: Not implemented. 1174 Returns: 1175 True if success, False if failure. 1176 """ 1177 connection_result = self.device.sl4f.gattc_lib.bleConnectToPeripheral( 1178 peer_identifier) 1179 if connection_result.get("error") is not None: 1180 self.log.error("Failed to connect to peer id {}: {}".format( 1181 peer_identifier, connection_result.get("error"))) 1182 return False 1183 return True 1184 1185 def gatt_client_refresh(self, peer_identifier): 1186 """ Perform a GATT Client Refresh of a perihperal. 1187 1188 Clears the internal cache and forces a refresh of the services from the 1189 remote device. In Fuchsia there is no FIDL api to automatically do this 1190 yet. Therefore just read all Characteristics which satisfies the same 1191 requirements. 1192 1193 Args: 1194 peer_identifier: The peer to refresh. 1195 """ 1196 self._read_all_characteristics(peer_identifier) 1197 1198 def gatt_client_discover_characteristic_by_uuid(self, peer_identifier, 1199 uuid): 1200 """ Perform a GATT Client Refresh of a perihperal. 1201 1202 Clears the internal cache and forces a refresh of the services from the 1203 remote device. In Fuchsia there is no FIDL api to automatically do this 1204 yet. Therefore just read all Characteristics which satisfies the same 1205 requirements. 1206 1207 Args: 1208 peer_identifier: The peer to refresh. 1209 """ 1210 self._read_all_characteristics(peer_identifier, uuid) 1211 1212 def gatt_disconnect(self, peer_identifier): 1213 """ Perform a GATT disconnect from a perihperal. 1214 1215 Args: 1216 peer_identifier: The peer to disconnect from. 1217 Returns: 1218 True if success, False if failure. 1219 """ 1220 disconnect_result = self.device.sl4f.gattc_lib.bleDisconnectPeripheral( 1221 peer_identifier) 1222 if disconnect_result.get("error") is not None: 1223 self.log.error("Failed to disconnect from peer id {}: {}".format( 1224 peer_identifier, disconnect_result.get("error"))) 1225 return False 1226 return True 1227 1228 def reset_bluetooth(self): 1229 """Stub for Fuchsia implementation.""" 1230 1231 def sdp_add_search(self, attribute_list, profile_id): 1232 """Adds an SDP search record. 1233 Args: 1234 attribute_list: The list of attributes to set 1235 profile_id: The profile ID to set. 1236 """ 1237 return self.device.sl4f.sdp_lib.addSearch(attribute_list, profile_id) 1238 1239 def sdp_add_service(self, sdp_record): 1240 """Adds an SDP service record. 1241 Args: 1242 sdp_record: The dictionary representing the search record to add. 1243 """ 1244 return self.device.sl4f.sdp_lib.addService(sdp_record) 1245 1246 def sdp_clean_up(self): 1247 """Cleans up all objects related to SDP. 1248 """ 1249 return self.device.sl4f.sdp_lib.cleanUp() 1250 1251 def sdp_init(self): 1252 """Initializes SDP on the device. 1253 """ 1254 return self.device.sl4f.sdp_lib.init() 1255 1256 def sdp_remove_service(self, service_id): 1257 """Removes a service based on an input id. 1258 Args: 1259 service_id: The service ID to remove. 1260 """ 1261 return self.device.sl4f.sdp_lib.init() 1262 1263 def start_le_advertisement(self, adv_data, scan_response, adv_interval, 1264 connectable): 1265 """ Starts an LE advertisement 1266 1267 Args: 1268 adv_data: Advertisement data. 1269 adv_interval: Advertisement interval. 1270 """ 1271 self.device.sl4f.ble_lib.bleStartBleAdvertising( 1272 adv_data, scan_response, adv_interval, connectable) 1273 1274 def stop_le_advertisement(self): 1275 """ Stop active LE advertisement. 1276 """ 1277 self.device.sl4f.ble_lib.bleStopBleAdvertising() 1278 1279 def setup_gatt_server(self, database): 1280 """ Sets up an input GATT server. 1281 1282 Args: 1283 database: A dictionary representing the GATT database to setup. 1284 """ 1285 self.device.sl4f.gatts_lib.publishServer(database) 1286 1287 def close_gatt_server(self): 1288 """ Closes an existing GATT server. 1289 """ 1290 self.device.sl4f.gatts_lib.closeServer() 1291 1292 def le_scan_with_name_filter(self, name, timeout): 1293 """ Scan over LE for a specific device name. 1294 1295 Args: 1296 name: The name filter to set. 1297 timeout: The timeout to wait to find the advertisement. 1298 Returns: 1299 Discovered device id or None 1300 """ 1301 partial_match = True 1302 return le_scan_for_device_by_name(self.device, self.device.log, name, 1303 timeout, partial_match) 1304 1305 def log_info(self, log): 1306 """ Log directly onto the device. 1307 1308 Args: 1309 log: The informative log. 1310 """ 1311 self.device.sl4f.logging_lib.logI(log) 1312 1313 def unbond_all_known_devices(self): 1314 """ Unbond all known remote devices. 1315 """ 1316 try: 1317 device_list = self.device.sl4f.bts_lib.getKnownRemoteDevices( 1318 )['result'] 1319 for device_info in device_list: 1320 device = device_list[device_info] 1321 if device['bonded']: 1322 self.device.sl4f.bts_lib.forgetDevice(device['id']) 1323 except Exception as err: 1324 self.log.err("Unable to unbond all devices: {}".format(err)) 1325 1326 def unbond_device(self, peer_identifier): 1327 """ Unbond peer identifier. 1328 1329 Args: 1330 peer_identifier: The peer identifier for the peer to unbond. 1331 1332 """ 1333 self.device.sl4f.bts_lib.forgetDevice(peer_identifier) 1334 1335 def _find_service_id_and_connect_to_service_for_handle( 1336 self, peer_identifier, handle, uuid=False): 1337 fail_err = "Failed to find handle {} in Peer database." 1338 if uuid: 1339 handle = handle.lower() 1340 try: 1341 services = self.device.sl4f.gattc_lib.listServices(peer_identifier) 1342 for service in services['result']: 1343 service_id = service['id'] 1344 self.device.sl4f.gattc_lib.connectToService( 1345 peer_identifier, service_id) 1346 chars = self.device.sl4f.gattc_lib.discoverCharacteristics() 1347 1348 for char in chars['result']: 1349 char_id = char['id'] 1350 if uuid: 1351 char_id = char['uuid_type'] 1352 if handle == char_id: 1353 return True 1354 descriptors = char['descriptors'] 1355 for desc in descriptors: 1356 desc_id = desc["id"] 1357 if uuid: 1358 desc_id = desc['uuid_type'] 1359 if handle == desc_id: 1360 return True 1361 except Exception as err: 1362 self.log.error(fail_err.format(err)) 1363 return False 1364 1365 def _read_all_characteristics(self, peer_identifier, uuid=None): 1366 fail_err = "Failed to read all characteristics with: {}" 1367 try: 1368 services = self.device.sl4f.gattc_lib.listServices(peer_identifier) 1369 for service in services['result']: 1370 service_id = service['id'] 1371 service_uuid = service['uuid_type'] 1372 self.device.sl4f.gattc_lib.connectToService( 1373 peer_identifier, service_id) 1374 chars = self.device.sl4f.gattc_lib.discoverCharacteristics() 1375 self.log.info( 1376 "Reading chars in service uuid: {}".format(service_uuid)) 1377 1378 for char in chars['result']: 1379 char_id = char['id'] 1380 char_uuid = char['uuid_type'] 1381 if uuid and uuid.lower() not in char_uuid.lower(): 1382 continue 1383 try: 1384 read_val = \ 1385 self.device.sl4f.gattc_lib.readCharacteristicById( 1386 char_id) 1387 self.log.info( 1388 "\tCharacteristic uuid / Value: {} / {}".format( 1389 char_uuid, read_val['result'])) 1390 str_value = "" 1391 for val in read_val['result']: 1392 str_value += chr(val) 1393 self.log.info("\t\tstr val: {}".format(str_value)) 1394 except Exception as err: 1395 self.log.error(err) 1396 except Exception as err: 1397 self.log.error(fail_err.forma(err)) 1398 1399 def _perform_read_all_descriptors(self, peer_identifier): 1400 fail_err = "Failed to read all characteristics with: {}" 1401 try: 1402 services = self.device.sl4f.gattc_lib.listServices(peer_identifier) 1403 for service in services['result']: 1404 service_id = service['id'] 1405 service_uuid = service['uuid_type'] 1406 self.device.sl4f.gattc_lib.connectToService( 1407 peer_identifier, service_id) 1408 chars = self.device.sl4f.gattc_lib.discoverCharacteristics() 1409 self.log.info( 1410 "Reading descs in service uuid: {}".format(service_uuid)) 1411 1412 for char in chars['result']: 1413 char_id = char['id'] 1414 char_uuid = char['uuid_type'] 1415 descriptors = char['descriptors'] 1416 self.log.info( 1417 "\tReading descs in char uuid: {}".format(char_uuid)) 1418 for desc in descriptors: 1419 desc_id = desc["id"] 1420 desc_uuid = desc["uuid_type"] 1421 try: 1422 read_val = self.device.sl4f.gattc_lib.readDescriptorById( 1423 desc_id) 1424 self.log.info( 1425 "\t\tDescriptor uuid / Value: {} / {}".format( 1426 desc_uuid, read_val['result'])) 1427 except Exception as err: 1428 pass 1429 except Exception as err: 1430 self.log.error(fail_err.format(err)) 1431 1432 def init_pair(self, peer_identifier, security_level, non_bondable, 1433 transport): 1434 """ Send an outgoing pairing request the input peer_identifier. 1435 1436 Android currently does not support setting various security levels or 1437 bondable modes. Making them available for other bluetooth_device 1438 variants. Depending on the Address type, Android will figure out the 1439 transport to pair automatically. 1440 1441 Args: 1442 peer_identifier: A string representing the device id. 1443 security_level: The security level required for this pairing request 1444 represented as a u64. (Only for LE pairing) 1445 Available Values 1446 1 - ENCRYPTED: Encrypted without MITM protection 1447 (unauthenticated) 1448 2 - AUTHENTICATED: Encrypted with MITM protection 1449 (authenticated) 1450 None: No pairing security level. 1451 non_bondable: A bool representing whether the pairing mode is 1452 bondable or not. None is also accepted. False if bondable, True 1453 if non-bondable 1454 transport: A u64 representing the transport type. 1455 Available Values 1456 1 - BREDR: Classic BR/EDR transport 1457 2 - LE: Bluetooth Low Energy Transport 1458 Returns: 1459 True if successful, False if failed. 1460 """ 1461 try: 1462 self.device.sl4f.bts_lib.pair(peer_identifier, security_level, 1463 non_bondable, transport) 1464 return True 1465 except Exception as err: 1466 fail_err = "Failed to pair to peer_identifier {} with: {}".format( 1467 peer_identifier) 1468 self.log.error(fail_err.format(err)) 1469