1#!/usr/bin/env python3 2# 3# Copyright (C) 2019 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17Python script for wrappers to various libraries. 18 19Class CmdInput inherts from the cmd library. 20 21Functions that start with "do_" have a method 22signature that doesn't match the actual command 23line command and that is intended. This is so the 24"help" command knows what to display (in this case 25the documentation of the command itself). 26 27For example: 28Looking at the function "do_tool_set_target_device_name" 29has the inputs self and line which is expected of this type 30of method signature. When the "help" command is done on the 31method name you get the function documentation as such: 32 33(Cmd) help tool_set_target_device_name 34 35 Description: Reset the target device name. 36 Input(s): 37 device_name: Required. The advertising name to connect to. 38 Usage: tool_set_target_device_name new_target_device name 39 Examples: 40 tool_set_target_device_name le_watch 41 42This is all to say this documentation pattern is expected. 43 44""" 45 46from acts_contrib.test_utils.audio_analysis_lib.check_quality import quality_analysis 47from acts_contrib.test_utils.bt.bt_constants import audio_bits_per_sample_32 48from acts_contrib.test_utils.bt.bt_constants import audio_sample_rate_48000 49from acts_contrib.test_utils.abstract_devices.bluetooth_device import create_bluetooth_device 50from acts_contrib.test_utils.bt.bt_constants import bt_attribute_values 51from acts_contrib.test_utils.bt.bt_constants import sig_appearance_constants 52from acts_contrib.test_utils.bt.bt_constants import sig_uuid_constants 53from acts_contrib.test_utils.fuchsia.sdp_records import sdp_pts_record_list 54 55import acts_contrib.test_utils.bt.gatt_test_database as gatt_test_database 56 57import cmd 58import pprint 59import time 60"""Various Global Strings""" 61BASE_UUID = sig_uuid_constants['BASE_UUID'] 62CMD_LOG = "CMD {} result: {}" 63FAILURE = "CMD {} threw exception: {}" 64BASIC_ADV_NAME = "fs_test" 65 66 67class CommandInput(cmd.Cmd): 68 ble_adv_interval = 1000 69 ble_adv_appearance = None 70 ble_adv_data_include_tx_power_level = False 71 ble_adv_include_name = True 72 ble_adv_include_scan_response = False 73 ble_adv_name = "fs_test" 74 ble_adv_data_manufacturer_data = None 75 ble_adv_data_service_data = None 76 ble_adv_data_service_uuid_list = None 77 ble_adv_data_uris = None 78 79 bt_control_ids = [] 80 bt_control_names = [] 81 bt_control_devices = [] 82 bt_scan_poll_timer = 0.5 83 target_device_name = "" 84 le_ids = [] 85 unique_mac_addr_id = None 86 87 def setup_vars(self, dut, target_device_name, log): 88 self.pri_dut = dut 89 # Note: test_dut is the start of a slow conversion from a Fuchsia specific 90 # Tool to an abstract_device tool. Only commands that use test_dut will work 91 # Otherwise this tool is primarially targeted at Fuchsia devices. 92 self.test_dut = create_bluetooth_device(self.pri_dut) 93 self.test_dut.initialize_bluetooth_controller() 94 self.target_device_name = target_device_name 95 self.log = log 96 97 def emptyline(self): 98 pass 99 100 def do_EOF(self, line): 101 "End Script" 102 return True 103 104 """ Useful Helper functions and cmd line tooling """ 105 106 def str_to_bool(self, s): 107 if s.lower() == 'true': 108 return True 109 elif s.lower() == 'false': 110 return False 111 112 def _find_unique_id_over_le(self): 113 scan_filter = {"name_substring": self.target_device_name} 114 self.unique_mac_addr_id = None 115 self.pri_dut.sl4f.gattc_lib.bleStartBleScan(scan_filter) 116 tries = 10 117 for i in range(tries): 118 time.sleep(self.bt_scan_poll_timer) 119 scan_res = self.pri_dut.sl4f.gattc_lib.bleGetDiscoveredDevices( 120 )['result'] 121 for device in scan_res: 122 name, did, connectable = device["name"], device["id"], device[ 123 "connectable"] 124 if (self.target_device_name in name): 125 self.unique_mac_addr_id = did 126 self.log.info( 127 "Successfully found device: name, id: {}, {}".format( 128 name, did)) 129 break 130 if self.unique_mac_addr_id: 131 break 132 self.pri_dut.sl4f.gattc_lib.bleStopBleScan() 133 134 def _find_unique_id_over_bt_control(self): 135 self.unique_mac_addr_id = None 136 self.bt_control_devices = [] 137 self.pri_dut.sl4f.bts_lib.requestDiscovery(True) 138 tries = 10 139 for i in range(tries): 140 if self.unique_mac_addr_id: 141 break 142 time.sleep(self.bt_scan_poll_timer) 143 device_list = self.pri_dut.sl4f.bts_lib.getKnownRemoteDevices( 144 )['result'] 145 for id_dict in device_list: 146 device = device_list[id_dict] 147 self.bt_control_devices.append(device) 148 name = None 149 if device['name'] is not None: 150 name = device['name'] 151 did, address = device['id'], device['address'] 152 153 self.bt_control_ids.append(did) 154 if name is not None: 155 self.bt_control_names.append(name) 156 if self.target_device_name in name: 157 self.unique_mac_addr_id = did 158 self.log.info( 159 "Successfully found device: name, id, address: {}, {}, {}" 160 .format(name, did, address)) 161 break 162 self.pri_dut.sl4f.bts_lib.requestDiscovery(False) 163 164 def do_tool_take_bt_snoop_log(self, custom_name): 165 """ 166 Description: Takes the bt snoop log from the Fuchsia device. 167 Logs will show up in your config files' logpath directory. 168 169 Input(s): 170 custom_name: Optional. Override the default pcap file name. 171 172 Usage: tool_set_target_device_name new_target_device name 173 Examples: 174 tool_take_bt_snoop_log connection_error 175 tool_take_bt_snoop_log 176 """ 177 self.pri_dut.take_bt_snoop_log(custom_name) 178 179 def do_tool_refresh_unique_id(self, line): 180 """ 181 Description: Refresh command line tool mac unique id. 182 Usage: 183 Examples: 184 tool_refresh_unique_id 185 """ 186 try: 187 self._find_unique_id_over_le() 188 except Exception as err: 189 self.log.error( 190 "Failed to scan or find scan result: {}".format(err)) 191 192 def do_tool_refresh_unique_id_using_bt_control(self, line): 193 """ 194 Description: Refresh command line tool mac unique id. 195 Usage: 196 Examples: 197 tool_refresh_unique_id_using_bt_control 198 """ 199 try: 200 self._find_unique_id_over_bt_control() 201 except Exception as err: 202 self.log.error( 203 "Failed to scan or find scan result: {}".format(err)) 204 205 def do_tool_set_target_device_name(self, line): 206 """ 207 Description: Reset the target device name. 208 Input(s): 209 device_name: Required. The advertising name to connect to. 210 Usage: tool_set_target_device_name new_target_device name 211 Examples: 212 tool_set_target_device_name le_watch 213 """ 214 self.log.info("Setting target_device_name to: {}".format(line)) 215 self.target_device_name = line 216 217 def do_tool_set_unique_mac_addr_id(self, line): 218 """ 219 Description: Sets the unique mac address id (Specific to Fuchsia) 220 Input(s): 221 device_id: Required. The id to set the unique mac address id to 222 Usage: tool_set_unique_mac_addr_id device_id 223 Examples: 224 tool_set_unique_mac_addr_id 7fb2cae53aad9e0d 225 """ 226 self.unique_mac_addr_id = line 227 228 """Begin BLE advertise wrappers""" 229 230 def complete_ble_adv_data_include_name(self, text, line, begidx, endidx): 231 roles = ["true", "false"] 232 if not text: 233 completions = roles 234 else: 235 completions = [s for s in roles if s.startswith(text)] 236 return completions 237 238 def do_ble_adv_data_include_name(self, line): 239 cmd = "Include name in the advertisement." 240 try: 241 self.ble_adv_include_name = self.str_to_bool(line) 242 except Exception as err: 243 self.log.error(FAILURE.format(cmd, err)) 244 245 def do_ble_adv_data_set_name(self, line): 246 cmd = "Set the name to be included in the advertisement." 247 try: 248 self.ble_adv_name = line 249 except Exception as err: 250 self.log.error(FAILURE.format(cmd, err)) 251 252 def complete_ble_adv_data_set_appearance(self, text, line, begidx, endidx): 253 if not text: 254 completions = list(sig_appearance_constants.keys()) 255 else: 256 completions = [ 257 s for s in sig_appearance_constants.keys() 258 if s.startswith(text) 259 ] 260 return completions 261 262 def do_ble_adv_data_set_appearance(self, line): 263 cmd = "Set the appearance to known SIG values." 264 try: 265 self.ble_adv_appearance = line 266 except Exception as err: 267 self.log.error(FAILURE.format(cmd, err)) 268 269 def complete_ble_adv_data_include_tx_power_level(self, text, line, begidx, 270 endidx): 271 options = ['true', 'false'] 272 if not text: 273 completions = list(options)[:] 274 else: 275 completions = [s for s in options if s.startswith(text)] 276 return completions 277 278 def do_ble_adv_data_include_tx_power_level(self, line): 279 """Include the tx_power_level in the advertising data. 280 Description: Adds tx_power_level to the advertisement data to the BLE 281 advertisement. 282 Input(s): 283 value: Required. True or False 284 Usage: ble_adv_data_include_tx_power_level bool_value 285 Examples: 286 ble_adv_data_include_tx_power_level true 287 ble_adv_data_include_tx_power_level false 288 """ 289 cmd = "Include tx_power_level in advertisement." 290 try: 291 self.ble_adv_data_include_tx_power_level = self.str_to_bool(line) 292 except Exception as err: 293 self.log.info(FAILURE.format(cmd, err)) 294 295 def complete_ble_adv_include_scan_response(self, text, line, begidx, 296 endidx): 297 options = ['true', 'false'] 298 if not text: 299 completions = list(options)[:] 300 else: 301 completions = [s for s in options if s.startswith(text)] 302 return completions 303 304 def do_ble_adv_include_scan_response(self, line): 305 """Include scan response in advertisement. inputs: [true|false] 306 Note: Currently just sets the scan response data to the 307 Advertisement data. 308 """ 309 cmd = "Include tx_power_level in advertisement." 310 try: 311 self.ble_adv_include_scan_response = self.str_to_bool(line) 312 except Exception as err: 313 self.log.info(FAILURE.format(cmd, err)) 314 315 def do_ble_adv_data_add_manufacturer_data(self, line): 316 """Include manufacturer id and data to the advertisment 317 Description: Adds manufacturer data to the BLE advertisement. 318 Input(s): 319 id: Required. The int representing the manufacturer id. 320 data: Required. The string representing the data. 321 Usage: ble_adv_data_add_manufacturer_data id data 322 Examples: 323 ble_adv_data_add_manufacturer_data 1 test 324 """ 325 cmd = "Include manufacturer id and data to the advertisment." 326 try: 327 328 info = line.split() 329 if self.ble_adv_data_manufacturer_data is None: 330 self.ble_adv_data_manufacturer_data = [] 331 self.ble_adv_data_manufacturer_data.append({ 332 "id": int(info[0]), 333 "data": info[1] 334 }) 335 except Exception as err: 336 self.log.info(FAILURE.format(cmd, err)) 337 338 def do_ble_adv_data_add_service_data(self, line): 339 """Include service data to the advertisment 340 Description: Adds service data to the BLE advertisement. 341 Input(s): 342 uuid: Required. The string representing the uuid. 343 data: Required. The string representing the data. 344 Usage: ble_adv_data_add_service_data uuid data 345 Examples: 346 ble_adv_data_add_service_data 00001801-0000-1000-8000-00805f9b34fb test 347 """ 348 cmd = "Include manufacturer id and data to the advertisment." 349 try: 350 info = line.split() 351 if self.ble_adv_data_service_data is None: 352 self.ble_adv_data_service_data = [] 353 self.ble_adv_data_service_data.append({ 354 "uuid": info[0], 355 "data": info[1] 356 }) 357 except Exception as err: 358 self.log.info(FAILURE.format(cmd, err)) 359 360 def do_ble_adv_add_service_uuid_list(self, line): 361 """Include a list of service uuids to the advertisment: 362 Description: Adds service uuid list to the BLE advertisement. 363 Input(s): 364 uuid: Required. A list of N string UUIDs to add. 365 Usage: ble_adv_add_service_uuid_list uuid0 uuid1 ... uuidN 366 Examples: 367 ble_adv_add_service_uuid_list 00001801-0000-1000-8000-00805f9b34fb 368 ble_adv_add_service_uuid_list 00001801-0000-1000-8000-00805f9b34fb 00001802-0000-1000-8000-00805f9b34fb 369 """ 370 cmd = "Include service uuid list to the advertisment data." 371 try: 372 self.ble_adv_data_service_uuid_list = line 373 except Exception as err: 374 self.log.info(FAILURE.format(cmd, err)) 375 376 def do_ble_adv_data_set_uris(self, uris): 377 """Set the URIs of the LE advertisement data: 378 Description: Adds list of String UIRs 379 See (RFC 3986 1.1.2 https://tools.ietf.org/html/rfc3986) 380 Valid URI examples: 381 ftp://ftp.is.co.za/rfc/rfc1808.txt 382 http://www.ietf.org/rfc/rfc2396.txt 383 ldap://[2001:db8::7]/c=GB?objectClass?one 384 mailto:John.Doe@example.com 385 news:comp.infosystems.www.servers.unix 386 tel:+1-816-555-1212 387 telnet://192.0.2.16:80/ 388 urn:oasis:names:specification:docbook:dtd:xml:4.1.2 389 Input(s): 390 uris: Required. A list of URIs to add. 391 Usage: ble_adv_data_set_uris uri0 uri1 ... uriN 392 Examples: 393 ble_adv_data_set_uris telnet://192.0.2.16:80/ 394 ble_adv_data_set_uris tel:+1-816-555-1212 395 """ 396 cmd = "Set the appearance to known SIG values." 397 try: 398 self.ble_adv_data_uris = uris.split() 399 except Exception as err: 400 self.log.error(FAILURE.format(cmd, err)) 401 402 def start_advertisement(self, connectable): 403 """ Handle setting advertising data and the advertisement 404 Note: After advertisement is successful, clears values set for 405 * Manufacturer data 406 * Appearance information 407 * Scan Response 408 * Service UUIDs 409 * URI list 410 Args: 411 connectable: Bool of whether to start a connectable 412 advertisement or not. 413 """ 414 adv_data_name = self.ble_adv_name 415 if not self.ble_adv_include_name: 416 adv_data_name = None 417 418 manufacturer_data = self.ble_adv_data_manufacturer_data 419 420 tx_power_level = None 421 if self.ble_adv_data_include_tx_power_level: 422 tx_power_level = 1 # Not yet implemented so set to 1 423 424 scan_response = self.ble_adv_include_scan_response 425 426 adv_data = { 427 "name": adv_data_name, 428 "appearance": self.ble_adv_appearance, 429 "service_data": self.ble_adv_data_service_data, 430 "tx_power_level": tx_power_level, 431 "service_uuids": self.ble_adv_data_service_uuid_list, 432 "manufacturer_data": manufacturer_data, 433 "uris": self.ble_adv_data_uris, 434 } 435 436 if not self.ble_adv_include_scan_response: 437 scan_response = None 438 else: 439 scan_response = adv_data 440 441 result = self.pri_dut.sl4f.ble_lib.bleStartBleAdvertising( 442 adv_data, scan_response, self.ble_adv_interval, connectable) 443 self.log.info("Result of starting advertisement: {}".format(result)) 444 self.ble_adv_data_manufacturer_data = None 445 self.ble_adv_appearance = None 446 self.ble_adv_include_scan_response = False 447 self.ble_adv_data_service_uuid_list = None 448 self.ble_adv_data_uris = None 449 self.ble_adv_data_service_data = None 450 451 def do_ble_start_generic_connectable_advertisement(self, line): 452 """ 453 Description: Start a connectable LE advertisement 454 455 Usage: ble_start_generic_connectable_advertisement 456 """ 457 cmd = "Start a connectable LE advertisement" 458 try: 459 connectable = True 460 self.start_advertisement(connectable) 461 except Exception as err: 462 self.log.error(FAILURE.format(cmd, err)) 463 464 def do_ble_start_generic_nonconnectable_advertisement(self, line): 465 """ 466 Description: Start a non-connectable LE advertisement 467 468 Usage: ble_start_generic_nonconnectable_advertisement 469 """ 470 cmd = "Start a nonconnectable LE advertisement" 471 try: 472 connectable = False 473 self.start_advertisement(connectable) 474 except Exception as err: 475 self.log.error(FAILURE.format(cmd, err)) 476 477 def do_ble_stop_advertisement(self, line): 478 """ 479 Description: Stop a BLE advertisement. 480 Usage: ble_stop_advertisement 481 """ 482 cmd = "Stop a connectable LE advertisement" 483 try: 484 self.pri_dut.sl4f.ble_lib.bleStopBleAdvertising() 485 except Exception as err: 486 self.log.error(FAILURE.format(cmd, err)) 487 488 """End BLE advertise wrappers""" 489 """Begin GATT client wrappers""" 490 491 def complete_gattc_connect_by_id(self, text, line, begidx, endidx): 492 if not text: 493 completions = list(self.le_ids)[:] 494 else: 495 completions = [s for s in self.le_ids if s.startswith(text)] 496 return completions 497 498 def do_gattc_connect_by_id(self, line): 499 """ 500 Description: Connect to a LE peripheral. 501 Input(s): 502 device_id: Required. The unique device ID from Fuchsia 503 discovered devices. 504 Usage: 505 Examples: 506 gattc_connect device_id 507 """ 508 cmd = "Connect to a LE peripheral by input ID." 509 try: 510 511 connection_status = self.pri_dut.sl4f.gattc_lib.bleConnectToPeripheral( 512 line) 513 self.log.info("Connection status: {}".format( 514 pprint.pformat(connection_status))) 515 except Exception as err: 516 self.log.error(FAILURE.format(cmd, err)) 517 518 def do_gattc_connect(self, line): 519 """ 520 Description: Connect to a LE peripheral. 521 Optional input: device_name 522 Input(s): 523 device_name: Optional. The peripheral ID to connect to. 524 Usage: 525 Examples: 526 gattc_connect 527 gattc_connect eddystone_123 528 """ 529 cmd = "Connect to a LE peripheral." 530 try: 531 if len(line) > 0: 532 self.target_device_name = line 533 self.unique_mac_addr_id = None 534 if not self.unique_mac_addr_id: 535 try: 536 self._find_unique_id() 537 except Exception as err: 538 self.log.info("Failed to scan or find device.") 539 return 540 connection_status = self.pri_dut.sl4f.gattc_lib.bleConnectToPeripheral( 541 self.unique_mac_addr_id) 542 self.log.info("Connection status: {}".format( 543 pprint.pformat(connection_status))) 544 except Exception as err: 545 self.log.error(FAILURE.format(cmd, err)) 546 547 def do_gattc_connect_disconnect_iterations(self, line): 548 """ 549 Description: Connect then disconnect to a LE peripheral multiple times. 550 Input(s): 551 iterations: Required. The number of iterations to run. 552 Usage: 553 Examples: 554 gattc_connect_disconnect_iterations 10 555 """ 556 cmd = "Connect to a LE peripheral." 557 try: 558 if not self.unique_mac_addr_id: 559 try: 560 self._find_unique_id() 561 except Exception as err: 562 self.log.info("Failed to scan or find device.") 563 return 564 for i in range(int(line)): 565 self.log.info("Running iteration {}".format(i + 1)) 566 connection_status = self.pri_dut.sl4f.gattc_lib.bleConnectToPeripheral( 567 self.unique_mac_addr_id) 568 self.log.info("Connection status: {}".format( 569 pprint.pformat(connection_status))) 570 time.sleep(4) 571 disc_status = self.pri_dut.sl4f.gattc_lib.bleDisconnectPeripheral( 572 self.unique_mac_addr_id) 573 self.log.info("Disconnect status: {}".format(disc_status)) 574 time.sleep(3) 575 except Exception as err: 576 self.log.error(FAILURE.format(cmd, err)) 577 578 def do_gattc_disconnect(self, line): 579 """ 580 Description: Disconnect from LE peripheral. 581 Assumptions: Already connected to a peripheral. 582 Usage: 583 Examples: 584 gattc_disconnect 585 """ 586 cmd = "Disconenct from LE peripheral." 587 try: 588 disconnect_status = self.pri_dut.sl4f.gattc_lib.bleDisconnectPeripheral( 589 self.unique_mac_addr_id) 590 self.log.info("Disconnect status: {}".format(disconnect_status)) 591 except Exception as err: 592 self.log.error(FAILURE.format(cmd, err)) 593 594 def do_gattc_list_services(self, discover_chars): 595 """ 596 Description: List services from LE peripheral. 597 Assumptions: Already connected to a peripheral. 598 Input(s): 599 discover_chars: Optional. An optional input to discover all 600 characteristics on the service. 601 Usage: 602 Examples: 603 gattc_list_services 604 gattc_list_services true 605 """ 606 cmd = "List services from LE peripheral." 607 try: 608 609 services = self.pri_dut.sl4f.gattc_lib.listServices( 610 self.unique_mac_addr_id) 611 self.log.info("Discovered Services: \n{}".format( 612 pprint.pformat(services))) 613 discover_characteristics = self.str_to_bool(discover_chars) 614 if discover_chars: 615 for service in services.get('result'): 616 self.pri_dut.sl4f.gattc_lib.connectToService( 617 self.unique_mac_addr_id, service.get('id')) 618 chars = self.pri_dut.sl4f.gattc_lib.discoverCharacteristics( 619 ) 620 self.log.info("Discovered chars:\n{}".format( 621 pprint.pformat(chars))) 622 623 except Exception as err: 624 self.log.error(FAILURE.format(cmd, err)) 625 626 def do_gattc_connect_to_service(self, line): 627 """ 628 Description: Connect to Peripheral GATT server service. 629 Assumptions: Already connected to peripheral. 630 Input(s): 631 service_id: Required. The service id reference on the GATT server. 632 Usage: 633 Examples: 634 gattc_connect_to_service service_id 635 """ 636 cmd = "GATT client connect to GATT server service." 637 try: 638 self.pri_dut.sl4f.gattc_lib.connectToService( 639 self.unique_mac_addr_id, int(line)) 640 except Exception as err: 641 self.log.error(FAILURE.format(cmd, err)) 642 643 def do_gattc_discover_characteristics(self, line): 644 """ 645 Description: Discover characteristics from a connected service. 646 Assumptions: Already connected to a GATT server service. 647 Usage: 648 Examples: 649 gattc_discover_characteristics 650 """ 651 cmd = "Discover and list characteristics from a GATT server." 652 try: 653 chars = self.pri_dut.sl4f.gattc_lib.discoverCharacteristics() 654 self.log.info("Discovered chars:\n{}".format( 655 pprint.pformat(chars))) 656 except Exception as err: 657 self.log.error(FAILURE.format(cmd, err)) 658 659 def do_gattc_notify_all_chars(self, line): 660 """ 661 Description: Enable all notifications on all Characteristics on 662 a GATT server. 663 Assumptions: Basic GATT connection made. 664 Usage: 665 Examples: 666 gattc_notify_all_chars 667 """ 668 cmd = "Read all characteristics from the GATT service." 669 try: 670 services = self.pri_dut.sl4f.gattc_lib.listServices( 671 self.unique_mac_addr_id) 672 for service in services['result']: 673 service_id = service['id'] 674 service_uuid = service['uuid_type'] 675 self.pri_dut.sl4f.gattc_lib.connectToService( 676 self.unique_mac_addr_id, service_id) 677 chars = self.pri_dut.sl4f.gattc_lib.discoverCharacteristics() 678 print("Reading chars in service uuid: {}".format(service_uuid)) 679 680 for char in chars['result']: 681 char_id = char['id'] 682 char_uuid = char['uuid_type'] 683 # quick char filter for apple-4 test... remove later 684 print("found uuid {}".format(char_uuid)) 685 try: 686 self.pri_dut.sl4f.gattc_lib.enableNotifyCharacteristic( 687 char_id) 688 except Exception as err: 689 print("error enabling notification") 690 except Exception as err: 691 self.log.error(FAILURE.format(cmd, err)) 692 693 def do_gattc_read_all_chars(self, line): 694 """ 695 Description: Read all Characteristic values from a GATT server across 696 all services. 697 Assumptions: Basic GATT connection made. 698 Usage: 699 Examples: 700 gattc_read_all_chars 701 """ 702 cmd = "Read all characteristics from the GATT service." 703 try: 704 services = self.pri_dut.sl4f.gattc_lib.listServices( 705 self.unique_mac_addr_id) 706 for service in services['result']: 707 service_id = service['id'] 708 service_uuid = service['uuid_type'] 709 self.pri_dut.sl4f.gattc_lib.connectToService( 710 self.unique_mac_addr_id, service_id) 711 chars = self.pri_dut.sl4f.gattc_lib.discoverCharacteristics() 712 print("Reading chars in service uuid: {}".format(service_uuid)) 713 714 for char in chars['result']: 715 char_id = char['id'] 716 char_uuid = char['uuid_type'] 717 try: 718 read_val = \ 719 self.pri_dut.sl4f.gattc_lib.readCharacteristicById( 720 char_id) 721 print(" Characteristic uuid / Value: {} / {}".format( 722 char_uuid, read_val['result'])) 723 str_value = "" 724 for val in read_val['result']: 725 str_value += chr(val) 726 print(" str val: {}".format(str_value)) 727 except Exception as err: 728 print(err) 729 except Exception as err: 730 self.log.error(FAILURE.format(cmd, err)) 731 732 def do_gattc_read_all_desc(self, line): 733 """ 734 Description: Read all Descriptors values from a GATT server across 735 all services. 736 Assumptions: Basic GATT connection made. 737 Usage: 738 Examples: 739 gattc_read_all_chars 740 """ 741 cmd = "Read all descriptors from the GATT service." 742 try: 743 services = self.pri_dut.sl4f.gattc_lib.listServices( 744 self.unique_mac_addr_id) 745 for service in services['result']: 746 service_id = service['id'] 747 service_uuid = service['uuid_type'] 748 self.pri_dut.sl4f.gattc_lib.connectToService( 749 self.unique_mac_addr_id, service_id) 750 chars = self.pri_dut.sl4f.gattc_lib.discoverCharacteristics() 751 print("Reading descs in service uuid: {}".format(service_uuid)) 752 753 for char in chars['result']: 754 char_id = char['id'] 755 char_uuid = char['uuid_type'] 756 descriptors = char['descriptors'] 757 print(" Reading descs in char uuid: {}".format(char_uuid)) 758 for desc in descriptors: 759 desc_id = desc["id"] 760 desc_uuid = desc["uuid_type"] 761 try: 762 read_val = self.pri_dut.sl4f.gattc_lib.readDescriptorById( 763 desc_id) 764 print(" Descriptor uuid / Value: {} / {}".format( 765 desc_uuid, read_val['result'])) 766 except Exception as err: 767 pass 768 except Exception as err: 769 self.log.error(FAILURE.format(cmd, err)) 770 771 def do_gattc_write_all_desc(self, line): 772 """ 773 Description: Write a value to all Descriptors on the GATT server. 774 Assumptions: Basic GATT connection made. 775 Input(s): 776 offset: Required. The offset to start writing to. 777 size: Required. The size of bytes to write (value will be generated). 778 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 779 Usage: 780 Examples: 781 gattc_write_all_desc 0 100 782 gattc_write_all_desc 10 2 783 """ 784 cmd = "Read all descriptors from the GATT service." 785 try: 786 args = line.split() 787 if len(args) != 2: 788 self.log.info("2 Arguments required: [Offset] [Size]") 789 return 790 offset = int(args[0]) 791 size = args[1] 792 write_value = [] 793 for i in range(int(size)): 794 write_value.append(i % 256) 795 services = self.pri_dut.sl4f.gattc_lib.listServices( 796 self.unique_mac_addr_id) 797 for service in services['result']: 798 service_id = service['id'] 799 service_uuid = service['uuid_type'] 800 self.pri_dut.sl4f.gattc_lib.connectToService( 801 self.unique_mac_addr_id, service_id) 802 chars = self.pri_dut.sl4f.gattc_lib.discoverCharacteristics() 803 print("Writing descs in service uuid: {}".format(service_uuid)) 804 805 for char in chars['result']: 806 char_id = char['id'] 807 char_uuid = char['uuid_type'] 808 descriptors = char['descriptors'] 809 print(" Reading descs in char uuid: {}".format(char_uuid)) 810 for desc in descriptors: 811 desc_id = desc["id"] 812 desc_uuid = desc["uuid_type"] 813 try: 814 write_val = self.pri_dut.sl4f.gattc_lib.writeDescriptorById( 815 desc_id, offset, write_value) 816 print(" Descriptor uuid / Result: {} / {}".format( 817 desc_uuid, write_val['result'])) 818 except Exception as err: 819 pass 820 except Exception as err: 821 self.log.error(FAILURE.format(cmd, err)) 822 823 def do_gattc_read_all_long_desc(self, line): 824 """ 825 Description: Read all long Characteristic Descriptors 826 Assumptions: Basic GATT connection made. 827 Input(s): 828 offset: Required. The offset to start reading from. 829 max_bytes: Required. The max size of bytes to return. 830 Usage: 831 Examples: 832 gattc_read_all_long_desc 0 100 833 gattc_read_all_long_desc 10 20 834 """ 835 cmd = "Read all long descriptors from the GATT service." 836 try: 837 args = line.split() 838 if len(args) != 2: 839 self.log.info("2 Arguments required: [Offset] [Size]") 840 return 841 offset = int(args[0]) 842 max_bytes = int(args[1]) 843 services = self.pri_dut.sl4f.ble_lib.bleListServices( 844 self.unique_mac_addr_id) 845 for service in services['result']: 846 service_id = service['id'] 847 service_uuid = service['uuid_type'] 848 self.pri_dut.sl4f.gattc_lib.connectToService( 849 self.unique_mac_addr_id, service_id) 850 chars = self.pri_dut.sl4f.gattc_lib.discoverCharacteristics() 851 print("Reading descs in service uuid: {}".format(service_uuid)) 852 853 for char in chars['result']: 854 char_id = char['id'] 855 char_uuid = char['uuid_type'] 856 descriptors = char['descriptors'] 857 print(" Reading descs in char uuid: {}".format(char_uuid)) 858 for desc in descriptors: 859 desc_id = desc["id"] 860 desc_uuid = desc["uuid_type"] 861 try: 862 read_val = self.pri_dut.sl4f.gattc_lib.readLongDescriptorById( 863 desc_id, offset, max_bytes) 864 print(" Descriptor uuid / Result: {} / {}".format( 865 desc_uuid, read_val['result'])) 866 except Exception as err: 867 pass 868 except Exception as err: 869 self.log.error(FAILURE.format(cmd, err)) 870 871 def do_gattc_read_all_long_char(self, line): 872 """ 873 Description: Read all long Characteristic 874 Assumptions: Basic GATT connection made. 875 Input(s): 876 offset: Required. The offset to start reading from. 877 max_bytes: Required. The max size of bytes to return. 878 Usage: 879 Examples: 880 gattc_read_all_long_char 0 100 881 gattc_read_all_long_char 10 20 882 """ 883 cmd = "Read all long Characteristics from the GATT service." 884 try: 885 args = line.split() 886 if len(args) != 2: 887 self.log.info("2 Arguments required: [Offset] [Size]") 888 return 889 offset = int(args[0]) 890 max_bytes = int(args[1]) 891 services = self.pri_dut.sl4f.ble_lib.bleListServices( 892 self.unique_mac_addr_id) 893 for service in services['result']: 894 service_id = service['id'] 895 service_uuid = service['uuid_type'] 896 self.pri_dut.sl4f.gattc_lib.connectToService( 897 self.unique_mac_addr_id, service_id) 898 chars = self.pri_dut.sl4f.gattc_lib.discoverCharacteristics() 899 print("Reading chars in service uuid: {}".format(service_uuid)) 900 901 for char in chars['result']: 902 char_id = char['id'] 903 char_uuid = char['uuid_type'] 904 try: 905 read_val = self.pri_dut.sl4f.gattc_lib.readLongCharacteristicById( 906 char_id, offset, max_bytes) 907 print(" Char uuid / Result: {} / {}".format( 908 char_uuid, read_val['result'])) 909 except Exception as err: 910 pass 911 except Exception as err: 912 self.log.error(FAILURE.format(cmd, err)) 913 914 def do_gattc_write_all_chars(self, line): 915 """ 916 Description: Write all characteristic values from a GATT server across 917 all services. 918 Assumptions: Basic GATT connection made. 919 Input(s): 920 offset: Required. The offset to start writing on. 921 size: The write value size (value will be generated) 922 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 923 Usage: 924 Examples: 925 gattc_write_all_chars 0 10 926 gattc_write_all_chars 10 1 927 """ 928 cmd = "Read all characteristics from the GATT service." 929 try: 930 args = line.split() 931 if len(args) != 2: 932 self.log.info("2 Arguments required: [Offset] [Size]") 933 return 934 offset = int(args[0]) 935 size = int(args[1]) 936 write_value = [] 937 for i in range(size): 938 write_value.append(i % 256) 939 services = self.pri_dut.sl4f.gattc_lib.listServices( 940 self.unique_mac_addr_id) 941 for service in services['result']: 942 service_id = service['id'] 943 service_uuid = service['uuid_type'] 944 self.pri_dut.sl4f.gattc_lib.connectToService( 945 self.unique_mac_addr_id, service_id) 946 chars = self.pri_dut.sl4f.gattc_lib.discoverCharacteristics() 947 print("Writing chars in service uuid: {}".format(service_uuid)) 948 949 for char in chars['result']: 950 char_id = char['id'] 951 char_uuid = char['uuid_type'] 952 try: 953 write_result = self.pri_dut.sl4f.gattc_lib.writeCharById( 954 char_id, offset, write_value) 955 print(" Characteristic uuid write result: {} / {}". 956 format(char_uuid, write_result['result'])) 957 except Exception as err: 958 print("error writing char {}".format(err)) 959 except Exception as err: 960 self.log.error(FAILURE.format(cmd, err)) 961 962 def do_gattc_write_all_chars_without_response(self, line): 963 """ 964 Description: Write all characteristic values from a GATT server across 965 all services. 966 Assumptions: Basic GATT connection made. 967 Input(s): 968 size: The write value size (value will be generated). 969 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 970 Usage: 971 Examples: 972 gattc_write_all_chars_without_response 100 973 """ 974 cmd = "Read all characteristics from the GATT service." 975 try: 976 args = line.split() 977 if len(args) != 1: 978 self.log.info("1 Arguments required: [Size]") 979 return 980 size = int(args[0]) 981 write_value = [] 982 for i in range(size): 983 write_value.append(i % 256) 984 services = self.pri_dut.sl4f.gattc_lib.listServices( 985 self.unique_mac_addr_id) 986 for service in services['result']: 987 service_id = service['id'] 988 service_uuid = service['uuid_type'] 989 self.pri_dut.sl4f.gattc_lib.connectToService( 990 self.unique_mac_addr_id, service_id) 991 chars = self.pri_dut.sl4f.gattc_lib.discoverCharacteristics() 992 print("Reading chars in service uuid: {}".format(service_uuid)) 993 994 for char in chars['result']: 995 char_id = char['id'] 996 char_uuid = char['uuid_type'] 997 try: 998 write_result = \ 999 self.pri_dut.sl4f.gattc_lib.writeCharByIdWithoutResponse( 1000 char_id, write_value) 1001 print(" Characteristic uuid write result: {} / {}". 1002 format(char_uuid, write_result['result'])) 1003 except Exception as err: 1004 pass 1005 except Exception as err: 1006 self.log.error(FAILURE.format(cmd, err)) 1007 1008 def do_gattc_write_char_by_id(self, line): 1009 """ 1010 Description: Write char by characteristic id reference. 1011 Assumptions: Already connected to a GATT server service. 1012 Input(s): 1013 characteristic_id: The characteristic id reference on the GATT 1014 service 1015 offset: The offset value to use 1016 size: Function will generate random bytes by input size. 1017 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 1018 Usage: 1019 Examples: 1020 gattc_write_char_by_id char_id 0 5 1021 gattc_write_char_by_id char_id 20 1 1022 """ 1023 cmd = "Write to GATT server characteristic ." 1024 try: 1025 args = line.split() 1026 if len(args) != 3: 1027 self.log.info("3 Arguments required: [Id] [Offset] [Size]") 1028 return 1029 id = int(args[0], 16) 1030 offset = int(args[1]) 1031 size = int(args[2]) 1032 write_value = [] 1033 for i in range(size): 1034 write_value.append(i % 256) 1035 self.test_dut.gatt_client_write_characteristic_by_handle( 1036 self.unique_mac_addr_id, id, offset, write_value) 1037 except Exception as err: 1038 self.log.error(FAILURE.format(cmd, err)) 1039 1040 def do_gattc_write_long_char_by_id(self, line): 1041 """ 1042 Description: Write long char by characteristic id reference. 1043 Assumptions: Already connected to a GATT server service. 1044 Input(s): 1045 characteristic_id: The characteristic id reference on the GATT 1046 service 1047 offset: The offset value to use 1048 size: Function will generate random bytes by input size. 1049 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 1050 reliable_mode: Optional: Reliable writes represented as bool 1051 Usage: 1052 Examples: 1053 gattc_write_long_char_by_id char_id 0 5 1054 gattc_write_long_char_by_id char_id 20 1 1055 gattc_write_long_char_by_id char_id 20 1 true 1056 gattc_write_long_char_by_id char_id 20 1 false 1057 """ 1058 cmd = "Long Write to GATT server characteristic ." 1059 try: 1060 args = line.split() 1061 if len(args) < 3: 1062 self.log.info("3 Arguments required: [Id] [Offset] [Size]") 1063 return 1064 id = int(args[0], 16) 1065 offset = int(args[1]) 1066 size = int(args[2]) 1067 reliable_mode = False 1068 if len(args) > 3: 1069 reliable_mode = self.str_to_bool(args[3]) 1070 write_value = [] 1071 for i in range(size): 1072 write_value.append(i % 256) 1073 self.test_dut.gatt_client_write_long_characteristic_by_handle( 1074 self.unique_mac_addr_id, id, offset, write_value, 1075 reliable_mode) 1076 except Exception as err: 1077 self.log.error(FAILURE.format(cmd, err)) 1078 1079 def do_gattc_write_long_desc_by_id(self, line): 1080 """ 1081 Description: Write long char by descrioptor id reference. 1082 Assumptions: Already connected to a GATT server service. 1083 Input(s): 1084 characteristic_id: The characteristic id reference on the GATT 1085 service 1086 offset: The offset value to use 1087 size: Function will generate random bytes by input size. 1088 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 1089 Usage: 1090 Examples: 1091 gattc_write_long_desc_by_id char_id 0 5 1092 gattc_write_long_desc_by_id char_id 20 1 1093 """ 1094 cmd = "Long Write to GATT server descriptor ." 1095 try: 1096 args = line.split() 1097 if len(args) != 3: 1098 self.log.info("3 Arguments required: [Id] [Offset] [Size]") 1099 return 1100 id = int(args[0], 16) 1101 offset = int(args[1]) 1102 size = int(args[2]) 1103 write_value = [] 1104 for i in range(size): 1105 write_value.append(i % 256) 1106 self.test_dut.gatt_client_write_long_descriptor_by_handle( 1107 self.unique_mac_addr_id, id, offset, write_value) 1108 except Exception as err: 1109 self.log.error(FAILURE.format(cmd, err)) 1110 1111 def do_gattc_write_char_by_id_without_response(self, line): 1112 """ 1113 Description: Write char by characteristic id reference without response. 1114 Assumptions: Already connected to a GATT server service. 1115 Input(s): 1116 characteristic_id: The characteristic id reference on the GATT 1117 service 1118 size: Function will generate random bytes by input size. 1119 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 1120 Usage: 1121 Examples: 1122 gattc_write_char_by_id_without_response char_id 5 1123 """ 1124 cmd = "Write characteristic by id without response." 1125 try: 1126 args = line.split() 1127 if len(args) != 2: 1128 self.log.info("2 Arguments required: [Id] [Size]") 1129 return 1130 id = int(args[0], 16) 1131 size = args[1] 1132 write_value = [] 1133 for i in range(int(size)): 1134 write_value.append(i % 256) 1135 self.test_dut.gatt_client_write_characteristic_without_response_by_handle( 1136 self.unique_mac_addr_id, id, write_value) 1137 except Exception as err: 1138 self.log.error(FAILURE.format(cmd, err)) 1139 1140 def do_gattc_enable_notify_char_by_id(self, line): 1141 """ 1142 Description: Enable Characteristic notification on Characteristic ID. 1143 Assumptions: Already connected to a GATT server service. 1144 Input(s): 1145 characteristic_id: The characteristic id reference on the GATT 1146 service 1147 Usage: 1148 Examples: 1149 gattc_enable_notify_char_by_id char_id 1150 """ 1151 cmd = "Enable notifications by Characteristic id." 1152 try: 1153 id = int(line, 16) 1154 self.test_dut.gatt_client_enable_notifiy_characteristic_by_handle( 1155 self.unique_mac_addr_id, id) 1156 except Exception as err: 1157 self.log.error(FAILURE.format(cmd, err)) 1158 1159 def do_gattc_disable_notify_char_by_id(self, line): 1160 """ 1161 Description: Disable Characteristic notification on Characteristic ID. 1162 Assumptions: Already connected to a GATT server service. 1163 Input(s): 1164 characteristic_id: The characteristic id reference on the GATT 1165 service 1166 Usage: 1167 Examples: 1168 gattc_disable_notify_char_by_id char_id 1169 """ 1170 cmd = "Disable notify Characteristic by id." 1171 try: 1172 id = int(line, 16) 1173 self.test_dut.gatt_client_disable_notifiy_characteristic_by_handle( 1174 self.unique_mac_addr_id, id) 1175 except Exception as err: 1176 self.log.error(FAILURE.format(cmd, err)) 1177 1178 def do_gattc_read_char_by_id(self, line): 1179 """ 1180 Description: Read Characteristic by ID. 1181 Assumptions: Already connected to a GATT server service. 1182 Input(s): 1183 characteristic_id: The characteristic id reference on the GATT 1184 service 1185 Usage: 1186 Examples: 1187 gattc_read_char_by_id char_id 1188 """ 1189 cmd = "Read Characteristic value by ID." 1190 try: 1191 id = int(line, 16) 1192 read_val = self.test_dut.gatt_client_read_characteristic_by_handle( 1193 self.unique_mac_addr_id, id) 1194 self.log.info("Characteristic Value with id {}: {}".format( 1195 id, read_val)) 1196 except Exception as err: 1197 self.log.error(FAILURE.format(cmd, err)) 1198 1199 def do_gattc_read_char_by_uuid(self, characteristic_uuid): 1200 """ 1201 Description: Read Characteristic by UUID (read by type). 1202 Assumptions: Already connected to a GATT server service. 1203 Input(s): 1204 characteristic_uuid: The characteristic id reference on the GATT 1205 service 1206 Usage: 1207 Examples: 1208 gattc_read_char_by_id char_id 1209 """ 1210 cmd = "Read Characteristic value by ID." 1211 try: 1212 short_uuid_len = 4 1213 if len(characteristic_uuid) == short_uuid_len: 1214 characteristic_uuid = BASE_UUID.format(characteristic_uuid) 1215 1216 read_val = self.test_dut.gatt_client_read_characteristic_by_uuid( 1217 self.unique_mac_addr_id, characteristic_uuid) 1218 self.log.info("Characteristic Value with id {}: {}".format( 1219 id, read_val)) 1220 except Exception as err: 1221 self.log.error(FAILURE.format(cmd, err)) 1222 1223 def do_gattc_write_desc_by_id(self, line): 1224 """ 1225 Description: Write Descriptor by characteristic id reference. 1226 Assumptions: Already connected to a GATT server service. 1227 Input(s): 1228 descriptor_id: The Descriptor id reference on the GATT service 1229 offset: The offset value to use 1230 size: Function will generate random bytes by input size. 1231 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 1232 Usage: 1233 Examples: 1234 gattc_write_desc_by_id desc_id 0 5 1235 gattc_write_desc_by_id desc_id 20 1 1236 """ 1237 cmd = "Write Descriptor by id." 1238 try: 1239 args = line.split() 1240 id = int(args[0], 16) 1241 offset = int(args[1]) 1242 size = args[2] 1243 write_value = [] 1244 for i in range(int(size)): 1245 write_value.append(i % 256) 1246 write_result = self.test_dut.gatt_client_write_descriptor_by_handle( 1247 self.unique_mac_addr_id, id, offset, write_value) 1248 self.log.info("Descriptor Write result {}: {}".format( 1249 id, write_result)) 1250 except Exception as err: 1251 self.log.error(FAILURE.format(cmd, err)) 1252 1253 def do_gattc_read_desc_by_id(self, line): 1254 """ 1255 Description: Read Descriptor by ID. 1256 Assumptions: Already connected to a GATT server service. 1257 Input(s): 1258 descriptor_id: The Descriptor id reference on the GATT service 1259 Usage: 1260 Examples: 1261 gattc_read_desc_by_id desc_id 1262 """ 1263 cmd = "Read Descriptor by ID." 1264 try: 1265 id = int(line, 16) 1266 read_val = self.test_dut.gatt_client_read_descriptor_by_handle( 1267 self.unique_mac_addr_id, id) 1268 self.log.info("Descriptor Value with id {}: {}".format( 1269 id, read_val)) 1270 except Exception as err: 1271 self.log.error(FAILURE.format(cmd, err)) 1272 1273 def do_gattc_read_long_char_by_id(self, line): 1274 """ 1275 Description: Read long Characteristic value by id. 1276 Assumptions: Already connected to a GATT server service. 1277 Input(s): 1278 characteristic_id: The characteristic id reference on the GATT 1279 service 1280 offset: The offset value to use. 1281 max_bytes: The max bytes size to return. 1282 Usage: 1283 Examples: 1284 gattc_read_long_char_by_id char_id 0 10 1285 gattc_read_long_char_by_id char_id 20 1 1286 """ 1287 cmd = "Read long Characteristic value by id." 1288 try: 1289 args = line.split() 1290 if len(args) != 3: 1291 self.log.info("3 Arguments required: [Id] [Offset] [Size]") 1292 return 1293 id = int(args[0], 16) 1294 offset = int(args[1]) 1295 max_bytes = int(args[2]) 1296 read_val = self.test_dut.gatt_client_read_long_characteristic_by_handle( 1297 self.unique_mac_addr_id, id, offset, max_bytes) 1298 self.log.info("Characteristic Value with id {}: {}".format( 1299 id, read_val['result'])) 1300 1301 except Exception as err: 1302 self.log.error(FAILURE.format(cmd, err)) 1303 1304 """End GATT client wrappers""" 1305 """Begin LE scan wrappers""" 1306 1307 def _update_scan_results(self, scan_results): 1308 self.le_ids = [] 1309 for scan in scan_results['result']: 1310 self.le_ids.append(scan['id']) 1311 1312 def do_ble_start_scan(self, line): 1313 """ 1314 Description: Perform a BLE scan. 1315 Default filter name: "" 1316 Optional input: filter_device_name 1317 Usage: 1318 Examples: 1319 ble_start_scan 1320 ble_start_scan eddystone 1321 """ 1322 cmd = "Perform a BLE scan and list discovered devices." 1323 try: 1324 scan_filter = {"name_substring": ""} 1325 if line: 1326 scan_filter = {"name_substring": line} 1327 self.pri_dut.sl4f.gattc_lib.bleStartBleScan(scan_filter) 1328 except Exception as err: 1329 self.log.error(FAILURE.format(cmd, err)) 1330 1331 def do_ble_stop_scan(self, line): 1332 """ 1333 Description: Stops a BLE scan and returns discovered devices. 1334 Usage: 1335 Examples: 1336 ble_stop_scan 1337 """ 1338 cmd = "Stops a BLE scan and returns discovered devices." 1339 try: 1340 scan_results = self.pri_dut.sl4f.gattc_lib.bleStopBleScan() 1341 self._update_scan_results(scan_results) 1342 self.log.info(pprint.pformat(scan_results)) 1343 except Exception as err: 1344 self.log.error(FAILURE.format(cmd, err)) 1345 1346 def do_ble_get_discovered_devices(self, line): 1347 """ 1348 Description: Get discovered LE devices of an active scan. 1349 Usage: 1350 Examples: 1351 ble_stop_scan 1352 """ 1353 cmd = "Get discovered LE devices of an active scan." 1354 try: 1355 scan_results = self.pri_dut.sl4f.gattc_lib.bleGetDiscoveredDevices( 1356 ) 1357 self._update_scan_results(scan_results) 1358 self.log.info(pprint.pformat(scan_results)) 1359 except Exception as err: 1360 self.log.error(FAILURE.format(cmd, err)) 1361 1362 """End LE scan wrappers""" 1363 """Begin GATT Server wrappers""" 1364 1365 def do_gatts_close(self, line): 1366 """ 1367 Description: Close active GATT server. 1368 1369 Usage: 1370 Examples: 1371 gatts_close 1372 """ 1373 cmd = "Close active GATT server." 1374 try: 1375 result = self.pri_dut.sl4f.gatts_lib.closeServer() 1376 self.log.info(result) 1377 except Exception as err: 1378 self.log.error(FAILURE.format(cmd, err)) 1379 1380 def complete_gatts_setup_database(self, text, line, begidx, endidx): 1381 if not text: 1382 completions = list( 1383 gatt_test_database.GATT_SERVER_DB_MAPPING.keys()) 1384 else: 1385 completions = [ 1386 s for s in gatt_test_database.GATT_SERVER_DB_MAPPING.keys() 1387 if s.startswith(text) 1388 ] 1389 return completions 1390 1391 def do_gatts_setup_database(self, line): 1392 """ 1393 Description: Setup a Gatt server database based on pre-defined inputs. 1394 Supports Tab Autocomplete. 1395 Input(s): 1396 descriptor_db_name: The descriptor db name that matches one in 1397 acts_contrib.test_utils.bt.gatt_test_database 1398 Usage: 1399 Examples: 1400 gatts_setup_database LARGE_DB_1 1401 """ 1402 cmd = "Setup GATT Server Database Based of pre-defined dictionaries" 1403 try: 1404 scan_results = self.pri_dut.sl4f.gatts_lib.publishServer( 1405 gatt_test_database.GATT_SERVER_DB_MAPPING.get(line)) 1406 self.log.info(scan_results) 1407 except Exception as err: 1408 self.log.error(FAILURE.format(cmd, err)) 1409 1410 """End GATT Server wrappers""" 1411 """Begin Bluetooth Controller wrappers""" 1412 1413 def complete_btc_pair(self, text, line, begidx, endidx): 1414 """ Provides auto-complete for btc_pair cmd. 1415 1416 See Cmd module for full description. 1417 """ 1418 arg_completion = len(line.split(" ")) - 1 1419 pairing_security_level_options = ['ENCRYPTED', 'AUTHENTICATED', 'NONE'] 1420 bondable_options = ['BONDABLE', 'NON_BONDABLE', 'NONE'] 1421 transport_options = ['BREDR', 'LE'] 1422 if arg_completion == 1: 1423 if not text: 1424 completions = pairing_security_level_options 1425 else: 1426 completions = [ 1427 s for s in pairing_security_level_options 1428 if s.startswith(text) 1429 ] 1430 return completions 1431 if arg_completion == 2: 1432 if not text: 1433 completions = bondable_options 1434 else: 1435 completions = [ 1436 s for s in bondable_options if s.startswith(text) 1437 ] 1438 return completions 1439 if arg_completion == 3: 1440 if not text: 1441 completions = transport_options 1442 else: 1443 completions = [ 1444 s for s in transport_options if s.startswith(text) 1445 ] 1446 return completions 1447 1448 def do_btc_pair(self, line): 1449 """ 1450 Description: Sends an outgoing pairing request. 1451 1452 Input(s): 1453 pairing security level: ENCRYPTED, AUTHENTICATED, or NONE 1454 bondable: BONDABLE, NON_BONDABLE, or NONE 1455 transport: BREDR or LE 1456 1457 Usage: 1458 Examples: 1459 btc_pair NONE NONE BREDR 1460 btc_pair ENCRYPTED NONE LE 1461 btc_pair AUTHENTICATED NONE LE 1462 btc_pair NONE NON_BONDABLE BREDR 1463 """ 1464 cmd = "Send an outgoing pairing request." 1465 pairing_security_level_mapping = { 1466 "ENCRYPTED": 1, 1467 "AUTHENTICATED": 2, 1468 "NONE": None, 1469 } 1470 1471 bondable_mapping = { 1472 "BONDABLE": True, 1473 "NON_BONDABLE": False, 1474 "NONE": None, 1475 } 1476 1477 transport_mapping = { 1478 "BREDR": 1, 1479 "LE": 2, 1480 } 1481 1482 try: 1483 options = line.split(" ") 1484 result = self.test_dut.init_pair( 1485 self.unique_mac_addr_id, 1486 pairing_security_level_mapping.get(options[0]), 1487 bondable_mapping.get(options[1]), 1488 transport_mapping.get(options[2]), 1489 ) 1490 self.log.info(result) 1491 except Exception as err: 1492 self.log.error(FAILURE.format(cmd, err)) 1493 1494 def complete_btc_accept_pairing(self, text, line, begidx, endidx): 1495 """ Provides auto-complete for btc_set_io_capabilities cmd. 1496 1497 See Cmd module for full description. 1498 """ 1499 arg_completion = len(line.split(" ")) - 1 1500 input_options = ['NONE', 'CONFIRMATION', 'KEYBOARD'] 1501 output_options = ['NONE', 'DISPLAY'] 1502 if arg_completion == 1: 1503 if not text: 1504 completions = input_options 1505 else: 1506 completions = [s for s in input_options if s.startswith(text)] 1507 return completions 1508 if arg_completion == 2: 1509 if not text: 1510 completions = output_options 1511 else: 1512 completions = [s for s in output_options if s.startswith(text)] 1513 return completions 1514 1515 def do_btc_accept_pairing(self, line): 1516 """ 1517 Description: Accept all incoming pairing requests. 1518 1519 Input(s): 1520 input: String - The input I/O capabilities to use 1521 Available Values: 1522 NONE - Input capability type None 1523 CONFIRMATION - Input capability type confirmation 1524 KEYBOARD - Input capability type Keyboard 1525 output: String - The output I/O Capabilities to use 1526 Available Values: 1527 NONE - Output capability type None 1528 DISPLAY - output capability type Display 1529 1530 Usage: 1531 Examples: 1532 btc_accept_pairing 1533 btc_accept_pairing NONE DISPLAY 1534 btc_accept_pairing NONE NONE 1535 btc_accept_pairing KEYBOARD DISPLAY 1536 """ 1537 cmd = "Accept incoming pairing requests" 1538 try: 1539 input_capabilities = "NONE" 1540 output_capabilities = "NONE" 1541 options = line.split(" ") 1542 if len(options) > 1: 1543 input_capabilities = options[0] 1544 output_capabilities = options[1] 1545 result = self.pri_dut.sl4f.bts_lib.acceptPairing( 1546 input_capabilities, output_capabilities) 1547 self.log.info(result) 1548 except Exception as err: 1549 self.log.error(FAILURE.format(cmd, err)) 1550 1551 def do_btc_forget_device(self, line): 1552 """ 1553 Description: Forget pairing of the current device under test. 1554 Current device under test is the device found by 1555 tool_refresh_unique_id from custom user param. This function 1556 will also perform a clean disconnect if actively connected. 1557 1558 Usage: 1559 Examples: 1560 btc_forget_device 1561 """ 1562 cmd = "For pairing of the current device under test." 1563 try: 1564 self.log.info("Forgetting device id: {}".format( 1565 self.unique_mac_addr_id)) 1566 result = self.pri_dut.sl4f.bts_lib.forgetDevice( 1567 self.unique_mac_addr_id) 1568 self.log.info(result) 1569 except Exception as err: 1570 self.log.error(FAILURE.format(cmd, err)) 1571 1572 def do_btc_set_discoverable(self, discoverable): 1573 """ 1574 Description: Change Bluetooth Controller discoverablility. 1575 Input(s): 1576 discoverable: true to set discoverable 1577 false to set non-discoverable 1578 Usage: 1579 Examples: 1580 btc_set_discoverable true 1581 btc_set_discoverable false 1582 """ 1583 cmd = "Change Bluetooth Controller discoverablility." 1584 try: 1585 result = self.test_dut.set_discoverable( 1586 self.str_to_bool(discoverable)) 1587 self.log.info(result) 1588 except Exception as err: 1589 self.log.error(FAILURE.format(cmd, err)) 1590 1591 def do_btc_set_name(self, name): 1592 """ 1593 Description: Change Bluetooth Controller local name. 1594 Input(s): 1595 name: The name to set the Bluetooth Controller name to. 1596 1597 Usage: 1598 Examples: 1599 btc_set_name fs_test 1600 """ 1601 cmd = "Change Bluetooth Controller local name." 1602 try: 1603 result = self.test_dut.set_bluetooth_local_name(name) 1604 self.log.info(result) 1605 except Exception as err: 1606 self.log.error(FAILURE.format(cmd, err)) 1607 1608 def do_btc_request_discovery(self, discover): 1609 """ 1610 Description: Change whether the Bluetooth Controller is in active. 1611 discovery or not. 1612 Input(s): 1613 discover: true to start discovery 1614 false to end discovery 1615 Usage: 1616 Examples: 1617 btc_request_discovery true 1618 btc_request_discovery false 1619 """ 1620 cmd = "Change whether the Bluetooth Controller is in active." 1621 try: 1622 result = self.pri_dut.sl4f.bts_lib.requestDiscovery( 1623 self.str_to_bool(discover)) 1624 self.log.info(result) 1625 except Exception as err: 1626 self.log.error(FAILURE.format(cmd, err)) 1627 1628 def do_btc_get_known_remote_devices(self, line): 1629 """ 1630 Description: Get a list of known devices. 1631 1632 Usage: 1633 Examples: 1634 btc_get_known_remote_devices 1635 """ 1636 cmd = "Get a list of known devices." 1637 self.bt_control_devices = [] 1638 try: 1639 device_list = self.pri_dut.sl4f.bts_lib.getKnownRemoteDevices( 1640 )['result'] 1641 for id_dict in device_list: 1642 device = device_list[id_dict] 1643 self.bt_control_devices.append(device) 1644 self.log.info("Device found {}".format(device)) 1645 1646 except Exception as err: 1647 self.log.error(FAILURE.format(cmd, err)) 1648 1649 def do_btc_forget_all_known_devices(self, line): 1650 """ 1651 Description: Forget all known devices. 1652 1653 Usage: 1654 Examples: 1655 btc_forget_all_known_devices 1656 """ 1657 cmd = "Forget all known devices." 1658 try: 1659 device_list = self.pri_dut.sl4f.bts_lib.getKnownRemoteDevices( 1660 )['result'] 1661 for device in device_list: 1662 d = device_list[device] 1663 if d['bonded'] or d['connected']: 1664 self.log.info("Unbonding deivce: {}".format(d)) 1665 self.log.info( 1666 self.pri_dut.sl4f.bts_lib.forgetDevice( 1667 d['id'])['result']) 1668 except Exception as err: 1669 self.log.error(FAILURE.format(cmd, err)) 1670 1671 def do_btc_connect_device(self, line): 1672 """ 1673 Description: Connect to device under test. 1674 Device under test is specified by either user params 1675 or 1676 tool_set_target_device_name <name> 1677 do_tool_refresh_unique_id_using_bt_control 1678 1679 Usage: 1680 Examples: 1681 btc_connect_device 1682 """ 1683 cmd = "Connect to device under test." 1684 try: 1685 result = self.pri_dut.sl4f.bts_lib.connectDevice( 1686 self.unique_mac_addr_id) 1687 self.log.info(result) 1688 except Exception as err: 1689 self.log.error(FAILURE.format(cmd, err)) 1690 1691 def complete_btc_connect_device_by_id(self, text, line, begidx, endidx): 1692 if not text: 1693 completions = list(self.bt_control_ids)[:] 1694 else: 1695 completions = [ 1696 s for s in self.bt_control_ids if s.startswith(text) 1697 ] 1698 return completions 1699 1700 def do_btc_connect_device_by_id(self, device_id): 1701 """ 1702 Description: Connect to device id based on pre-defined inputs. 1703 Supports Tab Autocomplete. 1704 Input(s): 1705 device_id: The device id to connect to. 1706 1707 Usage: 1708 Examples: 1709 btc_connect_device_by_id <device_id> 1710 """ 1711 cmd = "Connect to device id based on pre-defined inputs." 1712 try: 1713 result = self.pri_dut.sl4f.bts_lib.connectDevice(device_id) 1714 self.log.info(result) 1715 except Exception as err: 1716 self.log.error(FAILURE.format(cmd, err)) 1717 1718 def complete_btc_connect_device_by_name(self, text, line, begidx, endidx): 1719 if not text: 1720 completions = list(self.bt_control_names)[:] 1721 else: 1722 completions = [ 1723 s for s in self.bt_control_names if s.startswith(text) 1724 ] 1725 return completions 1726 1727 def do_btc_connect_device_by_name(self, device_name): 1728 """ 1729 Description: Connect to device id based on pre-defined inputs. 1730 Supports Tab Autocomplete. 1731 Input(s): 1732 device_id: The device id to connect to. 1733 1734 Usage: 1735 Examples: 1736 btc_connect_device_by_name <device_id> 1737 """ 1738 cmd = "Connect to device name based on pre-defined inputs." 1739 try: 1740 for device in self.bt_control_devices: 1741 if device_name is device['name']: 1742 1743 result = self.pri_dut.sl4f.bts_lib.connectDevice( 1744 device['id']) 1745 self.log.info(result) 1746 except Exception as err: 1747 self.log.error(FAILURE.format(cmd, err)) 1748 1749 def do_btc_disconnect_device(self, line): 1750 """ 1751 Description: Disconnect to device under test. 1752 Device under test is specified by either user params 1753 or 1754 tool_set_target_device_name <name> 1755 do_tool_refresh_unique_id_using_bt_control 1756 1757 Usage: 1758 Examples: 1759 btc_disconnect_device 1760 """ 1761 cmd = "Disconnect to device under test." 1762 try: 1763 result = self.pri_dut.sl4f.bts_lib.disconnectDevice( 1764 self.unique_mac_addr_id) 1765 self.log.info(result) 1766 except Exception as err: 1767 self.log.error(FAILURE.format(cmd, err)) 1768 1769 def do_btc_init_bluetooth_control(self, line): 1770 """ 1771 Description: Initialize the Bluetooth Controller. 1772 1773 Usage: 1774 Examples: 1775 btc_init_bluetooth_control 1776 """ 1777 cmd = "Initialize the Bluetooth Controller." 1778 try: 1779 result = self.test_dut.initialize_bluetooth_controller() 1780 self.log.info(result) 1781 except Exception as err: 1782 self.log.error(FAILURE.format(cmd, err)) 1783 1784 def do_btc_get_local_address(self, line): 1785 """ 1786 Description: Get the local BR/EDR address of the Bluetooth Controller. 1787 1788 Usage: 1789 Examples: 1790 btc_get_local_address 1791 """ 1792 cmd = "Get the local BR/EDR address of the Bluetooth Controller." 1793 try: 1794 result = self.test_dut.get_local_bluetooth_address() 1795 self.log.info(result) 1796 except Exception as err: 1797 self.log.error(FAILURE.format(cmd, err)) 1798 1799 def do_btc_input_pairing_pin(self, line): 1800 """ 1801 Description: Sends a pairing pin to SL4F's Bluetooth Control's 1802 Pairing Delegate. 1803 1804 Usage: 1805 Examples: 1806 btc_input_pairing_pin 123456 1807 """ 1808 cmd = "Input pairing pin to the Fuchsia device." 1809 try: 1810 result = self.pri_dut.sl4f.bts_lib.inputPairingPin(line)['result'] 1811 self.log.info(result) 1812 except Exception as err: 1813 self.log.error(FAILURE.format(cmd, err)) 1814 1815 def do_btc_get_pairing_pin(self, line): 1816 """ 1817 Description: Gets the pairing pin from SL4F's Bluetooth Control's 1818 Pairing Delegate. 1819 1820 Usage: 1821 Examples: 1822 btc_get_pairing_pin 1823 """ 1824 cmd = "Get the pairing pin from the Fuchsia device." 1825 try: 1826 result = self.pri_dut.sl4f.bts_lib.getPairingPin()['result'] 1827 self.log.info(result) 1828 except Exception as err: 1829 self.log.error(FAILURE.format(cmd, err)) 1830 1831 """End Bluetooth Control wrappers""" 1832 """Begin Profile Server wrappers""" 1833 1834 def do_sdp_pts_example(self, num_of_records): 1835 """ 1836 Description: An example of how to setup a generic SDP record 1837 and SDP search capabilities. This example will pass a few 1838 SDP tests. 1839 1840 Input(s): 1841 num_of_records: The number of records to add. 1842 1843 Usage: 1844 Examples: 1845 sdp_pts_example 1 1846 sdp pts_example 10 1847 """ 1848 cmd = "Setup SDP for PTS testing." 1849 1850 attributes = [ 1851 bt_attribute_values['ATTR_PROTOCOL_DESCRIPTOR_LIST'], 1852 bt_attribute_values['ATTR_SERVICE_CLASS_ID_LIST'], 1853 bt_attribute_values['ATTR_BLUETOOTH_PROFILE_DESCRIPTOR_LIST'], 1854 bt_attribute_values['ATTR_A2DP_SUPPORTED_FEATURES'], 1855 ] 1856 1857 try: 1858 self.pri_dut.sl4f.sdp_lib.addSearch( 1859 attributes, int(sig_uuid_constants['AudioSource'], 16)) 1860 self.pri_dut.sl4f.sdp_lib.addSearch( 1861 attributes, int(sig_uuid_constants['A/V_RemoteControl'], 16)) 1862 self.pri_dut.sl4f.sdp_lib.addSearch( 1863 attributes, int(sig_uuid_constants['PANU'], 16)) 1864 self.pri_dut.sl4f.sdp_lib.addSearch( 1865 attributes, int(sig_uuid_constants['SerialPort'], 16)) 1866 self.pri_dut.sl4f.sdp_lib.addSearch( 1867 attributes, int(sig_uuid_constants['DialupNetworking'], 16)) 1868 self.pri_dut.sl4f.sdp_lib.addSearch( 1869 attributes, int(sig_uuid_constants['OBEXObjectPush'], 16)) 1870 self.pri_dut.sl4f.sdp_lib.addSearch( 1871 attributes, int(sig_uuid_constants['OBEXFileTransfer'], 16)) 1872 self.pri_dut.sl4f.sdp_lib.addSearch( 1873 attributes, int(sig_uuid_constants['Headset'], 16)) 1874 self.pri_dut.sl4f.sdp_lib.addSearch( 1875 attributes, int(sig_uuid_constants['HandsfreeAudioGateway'], 1876 16)) 1877 self.pri_dut.sl4f.sdp_lib.addSearch( 1878 attributes, int(sig_uuid_constants['Handsfree'], 16)) 1879 self.pri_dut.sl4f.sdp_lib.addSearch( 1880 attributes, int(sig_uuid_constants['SIM_Access'], 16)) 1881 for i in range(int(num_of_records)): 1882 result = self.pri_dut.sl4f.sdp_lib.addService( 1883 sdp_pts_record_list[i]) 1884 self.log.info(result) 1885 except Exception as err: 1886 self.log.error(FAILURE.format(cmd, err)) 1887 1888 def do_sdp_cleanup(self, line): 1889 """ 1890 Description: Cleanup any existing SDP records 1891 1892 Usage: 1893 Examples: 1894 sdp_cleanup 1895 """ 1896 cmd = "Cleanup SDP objects." 1897 try: 1898 result = self.pri_dut.sl4f.sdp_lib.cleanUp() 1899 self.log.info(result) 1900 except Exception as err: 1901 self.log.error(FAILURE.format(cmd, err)) 1902 1903 def do_sdp_init(self, line): 1904 """ 1905 Description: Init the profile proxy for setting up SDP records 1906 1907 Usage: 1908 Examples: 1909 sdp_init 1910 """ 1911 cmd = "Initialize profile proxy objects for adding SDP records" 1912 try: 1913 result = self.pri_dut.sl4f.sdp_lib.init() 1914 self.log.info(result) 1915 except Exception as err: 1916 self.log.error(FAILURE.format(cmd, err)) 1917 1918 def do_sdp_connect_l2cap(self, line): 1919 """ 1920 Description: Send an l2cap connection request over an input psm value. 1921 1922 Note: Must be already connected to a peer. 1923 1924 Input(s): 1925 psm: The int hex value to connect over. Available PSMs: 1926 SDP 0x0001 See Bluetooth Service Discovery Protocol (SDP) 1927 RFCOMM 0x0003 See RFCOMM with TS 07.10 1928 TCS-BIN 0x0005 See Bluetooth Telephony Control Specification / 1929 TCS Binary 1930 TCS-BIN-CORDLESS 0x0007 See Bluetooth Telephony Control 1931 Specification / TCS Binary 1932 BNEP 0x000F See Bluetooth Network Encapsulation Protocol 1933 HID_Control 0x0011 See Human Interface Device 1934 HID_Interrupt 0x0013 See Human Interface Device 1935 UPnP 0x0015 See [ESDP] 1936 AVCTP 0x0017 See Audio/Video Control Transport Protocol 1937 AVDTP 0x0019 See Audio/Video Distribution Transport Protocol 1938 AVCTP_Browsing 0x001B See Audio/Video Remote Control Profile 1939 UDI_C-Plane 0x001D See the Unrestricted Digital Information 1940 Profile [UDI] 1941 ATT 0x001F See Bluetooth Core Specification 1942 3DSP 0x0021 See 3D Synchronization Profile. 1943 LE_PSM_IPSP 0x0023 See Internet Protocol Support Profile 1944 (IPSP) 1945 OTS 0x0025 See Object Transfer Service (OTS) 1946 EATT 0x0027 See Bluetooth Core Specification 1947 mode: String - The channel mode to connect to. Available values: 1948 Basic mode: BASIC 1949 Enhanced Retransmission mode: ERTM 1950 1951 Usage: 1952 Examples: 1953 sdp_connect_l2cap 0001 BASIC 1954 sdp_connect_l2cap 0019 ERTM 1955 """ 1956 cmd = "Connect l2cap" 1957 try: 1958 info = line.split() 1959 result = self.pri_dut.sl4f.sdp_lib.connectL2cap( 1960 self.unique_mac_addr_id, int(info[0], 16), info[1]) 1961 self.log.info(result) 1962 except Exception as err: 1963 self.log.error(FAILURE.format(cmd, err)) 1964 1965 """End Profile Server wrappers""" 1966 """Begin AVDTP wrappers""" 1967 1968 def do_avdtp_init(self, initiator_delay): 1969 """ 1970 Description: Init the A2DP component start and AVDTP service to 1971 initiate. 1972 1973 Input(s): 1974 initiator_delay: [Optional] The stream initiator delay to set in 1975 milliseconds. 1976 1977 Usage: 1978 Examples: 1979 avdtp_init 0 1980 avdtp_init 2000 1981 avdtp_init 1982 """ 1983 cmd = "Initialize AVDTP proxy" 1984 try: 1985 if not initiator_delay: 1986 initiator_delay = None 1987 result = self.pri_dut.sl4f.avdtp_lib.init(initiator_delay) 1988 self.log.info(result) 1989 except Exception as err: 1990 self.log.error(FAILURE.format(cmd, err)) 1991 1992 def do_avdtp_kill_a2dp(self, line): 1993 """ 1994 Description: Quickly kill any A2DP components. 1995 1996 Usage: 1997 Examples: 1998 avdtp_kill_a2dp 1999 """ 2000 cmd = "Kill A2DP service" 2001 try: 2002 self.pri_dut.start_v1_component("bt-a2dp") 2003 except Exception as err: 2004 self.log.error(FAILURE.format(cmd, err)) 2005 2006 def do_avdtp_get_connected_peers(self, line): 2007 """ 2008 Description: Get the connected peers for the AVDTP service 2009 2010 Usage: 2011 Examples: 2012 avdtp_get_connected_peers 2013 """ 2014 cmd = "AVDTP get connected peers" 2015 try: 2016 result = self.pri_dut.sl4f.avdtp_lib.getConnectedPeers() 2017 self.log.info(result) 2018 except Exception as err: 2019 self.log.error(FAILURE.format(cmd, err)) 2020 2021 def do_avdtp_set_configuration(self, peer_id): 2022 """ 2023 Description: Send AVDTP command to connected peer: set configuration 2024 2025 Input(s): 2026 peer_id: The specified peer_id. 2027 2028 Usage: 2029 Examples: 2030 avdtp_set_configuration <peer_id> 2031 """ 2032 cmd = "Send AVDTP set configuration to connected peer" 2033 try: 2034 result = self.pri_dut.sl4f.avdtp_lib.setConfiguration(int(peer_id)) 2035 self.log.info(result) 2036 except Exception as err: 2037 self.log.error(FAILURE.format(cmd, err)) 2038 2039 def do_avdtp_get_configuration(self, peer_id): 2040 """ 2041 Description: Send AVDTP command to connected peer: get configuration 2042 2043 Input(s): 2044 peer_id: The specified peer_id. 2045 2046 Usage: 2047 Examples: 2048 avdtp_get_configuration <peer_id> 2049 """ 2050 cmd = "Send AVDTP get configuration to connected peer" 2051 try: 2052 result = self.pri_dut.sl4f.avdtp_lib.getConfiguration(int(peer_id)) 2053 self.log.info(result) 2054 except Exception as err: 2055 self.log.error(FAILURE.format(cmd, err)) 2056 2057 def do_avdtp_get_capabilities(self, peer_id): 2058 """ 2059 Description: Send AVDTP command to connected peer: get capabilities 2060 2061 Input(s): 2062 peer_id: The specified peer_id. 2063 2064 Usage: 2065 Examples: 2066 avdtp_get_capabilities <peer_id> 2067 """ 2068 cmd = "Send AVDTP get capabilities to connected peer" 2069 try: 2070 result = self.pri_dut.sl4f.avdtp_lib.getCapabilities(int(peer_id)) 2071 self.log.info(result) 2072 except Exception as err: 2073 self.log.error(FAILURE.format(cmd, err)) 2074 2075 def do_avdtp_get_all_capabilities(self, peer_id): 2076 """ 2077 Description: Send AVDTP command to connected peer: get all capabilities 2078 2079 Input(s): 2080 peer_id: The specified peer_id. 2081 2082 Usage: 2083 Examples: 2084 avdtp_get_all_capabilities <peer_id> 2085 """ 2086 cmd = "Send AVDTP get all capabilities to connected peer" 2087 try: 2088 result = self.pri_dut.sl4f.avdtp_lib.getAllCapabilities( 2089 int(peer_id)) 2090 self.log.info(result) 2091 except Exception as err: 2092 self.log.error(FAILURE.format(cmd, err)) 2093 2094 def do_avdtp_reconfigure_stream(self, peer_id): 2095 """ 2096 Description: Send AVDTP command to connected peer: reconfigure stream 2097 2098 Input(s): 2099 peer_id: The specified peer_id. 2100 2101 Usage: 2102 Examples: 2103 avdtp_reconfigure_stream <peer_id> 2104 """ 2105 cmd = "Send AVDTP reconfigure stream to connected peer" 2106 try: 2107 result = self.pri_dut.sl4f.avdtp_lib.reconfigureStream( 2108 int(peer_id)) 2109 self.log.info(result) 2110 except Exception as err: 2111 self.log.error(FAILURE.format(cmd, err)) 2112 2113 def do_avdtp_suspend_stream(self, peer_id): 2114 """ 2115 Description: Send AVDTP command to connected peer: suspend stream 2116 2117 Input(s): 2118 peer_id: The specified peer_id. 2119 2120 Usage: 2121 Examples: 2122 avdtp_suspend_stream <peer_id> 2123 """ 2124 cmd = "Send AVDTP suspend stream to connected peer" 2125 try: 2126 result = self.pri_dut.sl4f.avdtp_lib.suspendStream(int(peer_id)) 2127 self.log.info(result) 2128 except Exception as err: 2129 self.log.error(FAILURE.format(cmd, err)) 2130 2131 def do_avdtp_suspend_reconfigure(self, peer_id): 2132 """ 2133 Description: Send AVDTP command to connected peer: suspend reconfigure 2134 2135 Input(s): 2136 peer_id: The specified peer_id. 2137 2138 Usage: 2139 Examples: 2140 avdtp_suspend_reconfigure <peer_id> 2141 """ 2142 cmd = "Send AVDTP suspend reconfigure to connected peer" 2143 try: 2144 result = self.pri_dut.sl4f.avdtp_lib.suspendAndReconfigure( 2145 int(peer_id)) 2146 self.log.info(result) 2147 except Exception as err: 2148 self.log.error(FAILURE.format(cmd, err)) 2149 2150 def do_avdtp_release_stream(self, peer_id): 2151 """ 2152 Description: Send AVDTP command to connected peer: release stream 2153 2154 Input(s): 2155 peer_id: The specified peer_id. 2156 2157 Usage: 2158 Examples: 2159 avdtp_release_stream <peer_id> 2160 """ 2161 cmd = "Send AVDTP release stream to connected peer" 2162 try: 2163 result = self.pri_dut.sl4f.avdtp_lib.releaseStream(int(peer_id)) 2164 self.log.info(result) 2165 except Exception as err: 2166 self.log.error(FAILURE.format(cmd, err)) 2167 2168 def do_avdtp_establish_stream(self, peer_id): 2169 """ 2170 Description: Send AVDTP command to connected peer: establish stream 2171 2172 Input(s): 2173 peer_id: The specified peer_id. 2174 2175 Usage: 2176 Examples: 2177 avdtp_establish_stream <peer_id> 2178 """ 2179 cmd = "Send AVDTP establish stream to connected peer" 2180 try: 2181 result = self.pri_dut.sl4f.avdtp_lib.establishStream(int(peer_id)) 2182 self.log.info(result) 2183 except Exception as err: 2184 self.log.error(FAILURE.format(cmd, err)) 2185 2186 def do_avdtp_start_stream(self, peer_id): 2187 """ 2188 Description: Send AVDTP command to connected peer: start stream 2189 2190 Input(s): 2191 peer_id: The specified peer_id. 2192 2193 Usage: 2194 Examples: 2195 avdtp_start_stream <peer_id> 2196 """ 2197 cmd = "Send AVDTP start stream to connected peer" 2198 try: 2199 result = self.pri_dut.sl4f.avdtp_lib.startStream(int(peer_id)) 2200 self.log.info(result) 2201 except Exception as err: 2202 self.log.error(FAILURE.format(cmd, err)) 2203 2204 def do_avdtp_abort_stream(self, peer_id): 2205 """ 2206 Description: Send AVDTP command to connected peer: abort stream 2207 2208 Input(s): 2209 peer_id: The specified peer_id. 2210 2211 Usage: 2212 Examples: 2213 avdtp_abort_stream <peer_id> 2214 """ 2215 cmd = "Send AVDTP abort stream to connected peer" 2216 try: 2217 result = self.pri_dut.sl4f.avdtp_lib.abortStream(int(peer_id)) 2218 self.log.info(result) 2219 except Exception as err: 2220 self.log.error(FAILURE.format(cmd, err)) 2221 2222 def do_avdtp_remove_service(self, line): 2223 """ 2224 Description: Removes the AVDTP service in use. 2225 2226 Usage: 2227 Examples: 2228 avdtp_establish_stream <peer_id> 2229 """ 2230 cmd = "Remove AVDTP service" 2231 try: 2232 result = self.pri_dut.sl4f.avdtp_lib.removeService() 2233 self.log.info(result) 2234 except Exception as err: 2235 self.log.error(FAILURE.format(cmd, err)) 2236 2237 """End AVDTP wrappers""" 2238 """Begin Audio wrappers""" 2239 2240 def do_audio_start_output_save(self, line): 2241 """ 2242 Description: Start audio output save 2243 2244 Usage: 2245 Examples: 2246 audio_start_output_save 2247 """ 2248 cmd = "Start audio capture" 2249 try: 2250 result = self.pri_dut.sl4f.audio_lib.startOutputSave() 2251 self.log.info(result) 2252 except Exception as err: 2253 self.log.error(FAILURE.format(cmd, err)) 2254 2255 def do_audio_stop_output_save(self, line): 2256 """ 2257 Description: Stop audio output save 2258 2259 Usage: 2260 Examples: 2261 audio_stop_output_save 2262 """ 2263 cmd = "Stop audio capture" 2264 try: 2265 result = self.pri_dut.sl4f.audio_lib.stopOutputSave() 2266 self.log.info(result) 2267 except Exception as err: 2268 self.log.error(FAILURE.format(cmd, err)) 2269 2270 def do_audio_get_output_audio(self, line): 2271 """ 2272 Description: Get the audio output saved to a local file 2273 2274 Usage: 2275 Examples: 2276 audio_get_output_audio 2277 """ 2278 cmd = "Get audio capture" 2279 try: 2280 save_path = "{}/{}".format(self.pri_dut.log_path, "audio.raw") 2281 result = self.pri_dut.sl4f.audio_lib.getOutputAudio(save_path) 2282 except Exception as err: 2283 self.log.error(FAILURE.format(cmd, err)) 2284 2285 def do_audio_5_min_test(self, line): 2286 """ 2287 Description: Capture and anlyize sine audio waves played from a Bluetooth A2DP 2288 Source device. 2289 2290 Pre steps: 2291 1. Pair A2DP source device 2292 2. Prepare generated SOX file over preferred codec on source device. 2293 Quick way to generate necessary audio files: 2294 sudo apt-get install sox 2295 sox -b 16 -r 48000 -c 2 -n audio_file_2k1k_5_min.wav synth 300 sine 2000 sine 3000 2296 2297 Usage: 2298 Examples: 2299 audio_5_min_test 2300 """ 2301 cmd = "5 min audio capture test" 2302 input("Press Enter once Source device is streaming audio file") 2303 try: 2304 result = self.pri_dut.sl4f.audio_lib.startOutputSave() 2305 self.log.info(result) 2306 for i in range(5): 2307 print("Minutes left: {}".format(10 - i)) 2308 time.sleep(60) 2309 result = self.pri_dut.sl4f.audio_lib.stopOutputSave() 2310 log_time = int(time.time()) 2311 save_path = "{}/{}".format(self.pri_dut.log_path, 2312 "{}_audio.raw".format(log_time)) 2313 analysis_path = "{}/{}".format( 2314 self.pri_dut.log_path, 2315 "{}_audio_analysis.txt".format(log_time)) 2316 result = self.pri_dut.sl4f.audio_lib.getOutputAudio(save_path) 2317 2318 channels = 1 2319 try: 2320 quality_analysis(filename=save_path, 2321 output_file=analysis_path, 2322 bit_width=audio_bits_per_sample_32, 2323 rate=audio_sample_rate_48000, 2324 channel=channels, 2325 spectral_only=False) 2326 2327 except Exception as err: 2328 self.log.error("Failed to analyze raw audio: {}".format(err)) 2329 return False 2330 2331 self.log.info("Analysis output here: {}".format(analysis_path)) 2332 self.log.info("Analysis Results: {}".format( 2333 open(analysis_path).readlines())) 2334 except Exception as err: 2335 self.log.error(FAILURE.format(cmd, err)) 2336 2337 """End Audio wrappers""" 2338 """Begin HFP wrappers""" 2339 2340 def do_hfp_init(self, line): 2341 """ 2342 Description: Init the HFP component initiate. 2343 2344 Usage: 2345 Examples: 2346 hfp_init 2347 """ 2348 cmd = "Initialize HFP proxy" 2349 try: 2350 result = self.pri_dut.sl4f.hfp_lib.init() 2351 self.log.info(result) 2352 except Exception as err: 2353 self.log.error(FAILURE.format(cmd, err)) 2354 2355 def do_hfp_remove_service(self, line): 2356 """ 2357 Description: Removes the HFP service in use. 2358 2359 Usage: 2360 Examples: 2361 hfp_remove_service 2362 """ 2363 cmd = "Remove HFP service" 2364 try: 2365 result = self.pri_dut.sl4f.hfp_lib.removeService() 2366 self.log.info(result) 2367 except Exception as err: 2368 self.log.error(FAILURE.format(cmd, err)) 2369 2370 def do_hfp_list_peers(self, line): 2371 """ 2372 Description: List all HFP Hands-Free peers connected to the DUT. 2373 2374 Input(s): 2375 2376 Usage: 2377 Examples: 2378 hfp_list_peers 2379 """ 2380 cmd = "Lists connected peers" 2381 try: 2382 result = self.pri_dut.sl4f.hfp_lib.listPeers() 2383 self.log.info(pprint.pformat(result)) 2384 except Exception as err: 2385 self.log.error(FAILURE.format(cmd, err)) 2386 2387 def do_hfp_set_active_peer(self, line): 2388 """ 2389 Description: Set the active HFP Hands-Free peer for the DUT. 2390 2391 Input(s): 2392 peer_id: The id of the peer to be set active. 2393 2394 Usage: 2395 Examples: 2396 hfp_set_active_peer <peer_id> 2397 """ 2398 cmd = "Set the active peer" 2399 try: 2400 peer_id = int(line.strip()) 2401 result = self.pri_dut.sl4f.hfp_lib.setActivePeer(peer_id) 2402 self.log.info(result) 2403 except Exception as err: 2404 self.log.error(FAILURE.format(cmd, err)) 2405 2406 def do_hfp_list_calls(self, line): 2407 """ 2408 Description: List all calls known to the sl4f component on the DUT. 2409 2410 Input(s): 2411 2412 Usage: 2413 Examples: 2414 hfp_list_calls 2415 """ 2416 cmd = "Lists all calls" 2417 try: 2418 result = self.pri_dut.sl4f.hfp_lib.listCalls() 2419 self.log.info(pprint.pformat(result)) 2420 except Exception as err: 2421 self.log.error(FAILURE.format(cmd, err)) 2422 2423 def do_hfp_new_call(self, line): 2424 """ 2425 Description: Simulate a call on the call manager 2426 2427 Input(s): 2428 remote: The number of the remote party on the simulated call 2429 state: The state of the call. Must be one of "ringing", "waiting", 2430 "dialing", "alerting", "active", "held". 2431 direction: The direction of the call. Must be one of "incoming", "outgoing". 2432 2433 Usage: 2434 Examples: 2435 hfp_new_call <remote> <state> <direction> 2436 hfp_new_call 14085555555 active incoming 2437 hfp_new_call 14085555555 held outgoing 2438 hfp_new_call 14085555555 ringing incoming 2439 hfp_new_call 14085555555 waiting incoming 2440 hfp_new_call 14085555555 alerting outgoing 2441 hfp_new_call 14085555555 dialing outgoing 2442 """ 2443 cmd = "Simulates a call" 2444 try: 2445 info = line.strip().split() 2446 if len(info) != 3: 2447 raise ValueError( 2448 "Exactly three command line arguments required: <remote> <state> <direction>" 2449 ) 2450 remote, state, direction = info[0], info[1], info[2] 2451 result = self.pri_dut.sl4f.hfp_lib.newCall(remote, state, 2452 direction) 2453 self.log.info(result) 2454 except Exception as err: 2455 self.log.error(FAILURE.format(cmd, err)) 2456 2457 def do_hfp_incoming_call(self, line): 2458 """ 2459 Description: Simulate an incoming call on the call manager 2460 2461 Input(s): 2462 remote: The number of the remote party on the incoming call 2463 2464 Usage: 2465 Examples: 2466 hfp_incoming_call <remote> 2467 hfp_incoming_call 14085555555 2468 """ 2469 cmd = "Simulates an incoming call" 2470 try: 2471 remote = line.strip() 2472 result = self.pri_dut.sl4f.hfp_lib.initiateIncomingCall(remote) 2473 self.log.info(result) 2474 except Exception as err: 2475 self.log.error(FAILURE.format(cmd, err)) 2476 2477 def do_hfp_waiting_call(self, line): 2478 """ 2479 Description: Simulate an incoming call on the call manager when there is 2480 an onging active call already. 2481 2482 Input(s): 2483 remote: The number of the remote party on the incoming call 2484 2485 Usage: 2486 Examples: 2487 hfp_waiting_call <remote> 2488 hfp_waiting_call 14085555555 2489 """ 2490 cmd = "Simulates an incoming call" 2491 try: 2492 remote = line.strip() 2493 result = self.pri_dut.sl4f.hfp_lib.initiateIncomingWaitingCall( 2494 remote) 2495 self.log.info(result) 2496 except Exception as err: 2497 self.log.error(FAILURE.format(cmd, err)) 2498 2499 def do_hfp_outgoing_call(self, line): 2500 """ 2501 Description: Simulate an outgoing call on the call manager 2502 2503 Input(s): 2504 remote: The number of the remote party on the outgoing call 2505 2506 Usage: 2507 Examples: 2508 hfp_outgoing_call <remote> 2509 """ 2510 cmd = "Simulates an outgoing call" 2511 try: 2512 remote = line.strip() 2513 result = self.pri_dut.sl4f.hfp_lib.initiateOutgoingCall(remote) 2514 self.log.info(result) 2515 except Exception as err: 2516 self.log.error(FAILURE.format(cmd, err)) 2517 2518 def do_hfp_set_call_active(self, line): 2519 """ 2520 Description: Set the specified call to the "OngoingActive" state. 2521 2522 Input(s): 2523 call_id: The unique id of the call. 2524 2525 Usage: 2526 Examples: 2527 hfp_outgoing_call <call_id> 2528 """ 2529 cmd = "Set the specified call to active" 2530 try: 2531 call_id = int(line.strip()) 2532 result = self.pri_dut.sl4f.hfp_lib.setCallActive(call_id) 2533 self.log.info(result) 2534 except Exception as err: 2535 self.log.error(FAILURE.format(cmd, err)) 2536 2537 def do_hfp_set_call_held(self, line): 2538 """ 2539 Description: Set the specified call to the "OngoingHeld" state. 2540 2541 Input(s): 2542 call_id: The unique id of the call. 2543 2544 Usage: 2545 Examples: 2546 hfp_outgoing_call <call_id> 2547 """ 2548 cmd = "Set the specified call to held" 2549 try: 2550 call_id = int(line.strip()) 2551 result = self.pri_dut.sl4f.hfp_lib.setCallHeld(call_id) 2552 self.log.info(result) 2553 except Exception as err: 2554 self.log.error(FAILURE.format(cmd, err)) 2555 2556 def do_hfp_set_call_terminated(self, line): 2557 """ 2558 Description: Set the specified call to the "Terminated" state. 2559 2560 Input(s): 2561 call_id: The unique id of the call. 2562 2563 Usage: 2564 Examples: 2565 hfp_outgoing_call <call_id> 2566 """ 2567 cmd = "Set the specified call to terminated" 2568 try: 2569 call_id = int(line.strip()) 2570 result = self.pri_dut.sl4f.hfp_lib.setCallTerminated(call_id) 2571 self.log.info(result) 2572 except Exception as err: 2573 self.log.error(FAILURE.format(cmd, err)) 2574 2575 def do_hfp_set_call_transferred_to_ag(self, line): 2576 """ 2577 Description: Set the specified call to the "TransferredToAg" state. 2578 2579 Input(s): 2580 call_id: The unique id of the call. 2581 2582 Usage: 2583 Examples: 2584 hfp_outgoing_call <call_id> 2585 """ 2586 cmd = "Set the specified call to TransferredToAg" 2587 try: 2588 call_id = int(line.strip()) 2589 result = self.pri_dut.sl4f.hfp_lib.setCallTransferredToAg(call_id) 2590 self.log.info(result) 2591 except Exception as err: 2592 self.log.error(FAILURE.format(cmd, err)) 2593 2594 def do_hfp_set_speaker_gain(self, line): 2595 """ 2596 Description: Set the active peer's speaker gain. 2597 2598 Input(s): 2599 value: The gain value to set. Must be between 0-15 inclusive. 2600 2601 Usage: 2602 Examples: 2603 hfp_set_speaker_gain <value> 2604 """ 2605 cmd = "Set the active peer's speaker gain" 2606 try: 2607 value = int(line.strip()) 2608 result = self.pri_dut.sl4f.hfp_lib.setSpeakerGain(value) 2609 self.log.info(result) 2610 except Exception as err: 2611 self.log.error(FAILURE.format(cmd, err)) 2612 2613 def do_hfp_set_microphone_gain(self, line): 2614 """ 2615 Description: Set the active peer's microphone gain. 2616 2617 Input(s): 2618 value: The gain value to set. Must be between 0-15 inclusive. 2619 2620 Usage: 2621 Examples: 2622 hfp_set_microphone_gain <value> 2623 """ 2624 cmd = "Set the active peer's microphone gain" 2625 try: 2626 value = int(line.strip()) 2627 result = self.pri_dut.sl4f.hfp_lib.setMicrophoneGain(value) 2628 self.log.info(result) 2629 except Exception as err: 2630 self.log.error(FAILURE.format(cmd, err)) 2631 2632 def do_hfp_set_service_available(self, line): 2633 """ 2634 Description: Sets the simulated network service status reported by the call manager. 2635 2636 Input(s): 2637 value: "true" to set the network connection to available. 2638 2639 Usage: 2640 Examples: 2641 hfp_set_service_available <value> 2642 hfp_set_service_available true 2643 hfp_set_service_available false 2644 """ 2645 cmd = "Sets the simulated network service status reported by the call manager" 2646 try: 2647 value = line.strip() == "true" 2648 result = self.pri_dut.sl4f.hfp_lib.setServiceAvailable(value) 2649 self.log.info(result) 2650 except Exception as err: 2651 self.log.error(FAILURE.format(cmd, err)) 2652 2653 def do_hfp_set_roaming(self, line): 2654 """ 2655 Description: Sets the simulated roaming status reported by the call manager. 2656 2657 Input(s): 2658 value: "true" to set the network connection to roaming. 2659 2660 Usage: 2661 Examples: 2662 hfp_set_roaming <value> 2663 hfp_set_roaming true 2664 hfp_set_roaming false 2665 """ 2666 cmd = "Sets the simulated roaming status reported by the call manager" 2667 try: 2668 value = line.strip() == "true" 2669 result = self.pri_dut.sl4f.hfp_lib.setRoaming(value) 2670 self.log.info(result) 2671 except Exception as err: 2672 self.log.error(FAILURE.format(cmd, err)) 2673 2674 def do_hfp_set_signal_strength(self, line): 2675 """ 2676 Description: Sets the simulated signal strength reported by the call manager. 2677 2678 Input(s): 2679 value: The signal strength value to set. Must be between 0-5 inclusive. 2680 2681 Usage: 2682 Examples: 2683 hfp_set_signal_strength <value> 2684 hfp_set_signal_strength 0 2685 hfp_set_signal_strength 3 2686 hfp_set_signal_strength 5 2687 """ 2688 cmd = "Sets the simulated signal strength reported by the call manager" 2689 try: 2690 value = int(line.strip()) 2691 result = self.pri_dut.sl4f.hfp_lib.setSignalStrength(value) 2692 self.log.info(result) 2693 except Exception as err: 2694 self.log.error(FAILURE.format(cmd, err)) 2695 2696 def do_hfp_set_subscriber_number(self, line): 2697 """ 2698 Description: Sets the subscriber number reported by the call manager. 2699 2700 Input(s): 2701 value: The subscriber number to set. Maximum length 128 characters. 2702 2703 Usage: 2704 Examples: 2705 hfp_set_subscriber_number <value> 2706 hfp_set_subscriber_number 14085555555 2707 """ 2708 cmd = "Sets the subscriber number reported by the call manager" 2709 try: 2710 value = line.strip() 2711 result = self.pri_dut.sl4f.hfp_lib.setSubscriberNumber(value) 2712 self.log.info(result) 2713 except Exception as err: 2714 self.log.error(FAILURE.format(cmd, err)) 2715 2716 def do_hfp_set_operator(self, line): 2717 """ 2718 Description: Sets the operator value reported by the call manager. 2719 2720 Input(s): 2721 value: The operator value to set. Maximum length 16 characters. 2722 2723 Usage: 2724 Examples: 2725 hfp_set_operator <value> 2726 hfp_set_operator GoogleFi 2727 """ 2728 cmd = "Sets the operator value reported by the call manager" 2729 try: 2730 value = line.strip() 2731 result = self.pri_dut.sl4f.hfp_lib.setOperator(value) 2732 self.log.info(result) 2733 except Exception as err: 2734 self.log.error(FAILURE.format(cmd, err)) 2735 2736 def do_hfp_set_nrec_support(self, line): 2737 """ 2738 Description: Sets the noise reduction/echo cancelation support reported by the call manager. 2739 2740 Input(s): 2741 value: The nrec support bool. 2742 2743 Usage: 2744 Examples: 2745 hfp_set_nrec_support <value> 2746 hfp_set_nrec_support true 2747 hfp_set_nrec_support false 2748 """ 2749 cmd = "Sets the noise reduction/echo cancelation support reported by the call manager" 2750 try: 2751 value = line.strip() == "true" 2752 result = self.pri_dut.sl4f.hfp_lib.setNrecSupport(value) 2753 self.log.info(result) 2754 except Exception as err: 2755 self.log.error(FAILURE.format(cmd, err)) 2756 2757 def do_hfp_set_battery_level(self, line): 2758 """ 2759 Description: Sets the battery level reported by the call manager. 2760 2761 Input(s): 2762 value: The integer battery level value. Must be 0-5 inclusive. 2763 2764 Usage: 2765 Examples: 2766 hfp_set_battery_level <value> 2767 hfp_set_battery_level 0 2768 hfp_set_battery_level 3 2769 """ 2770 cmd = "Set the battery level reported by the call manager" 2771 try: 2772 value = int(line.strip()) 2773 result = self.pri_dut.sl4f.hfp_lib.setBatteryLevel(value) 2774 self.log.info(result) 2775 except Exception as err: 2776 self.log.error(FAILURE.format(cmd, err)) 2777 2778 def do_hfp_set_last_dialed(self, line): 2779 """ 2780 Description: Sets the last dialed number in the call manager. 2781 2782 Input(s): 2783 number: The number of the remote party. 2784 2785 Usage: 2786 Examples: 2787 hfp_set_last_dialed <number> 2788 hfp_set_last_dialed 14085555555 2789 """ 2790 cmd = "Sets the last dialed number in the call manager." 2791 try: 2792 number = line.strip() 2793 result = self.pri_dut.sl4f.hfp_lib.setLastDialed(number) 2794 self.log.info(result) 2795 except Exception as err: 2796 self.log.error(FAILURE.format(cmd, err)) 2797 2798 def do_hfp_clear_last_dialed(self, line): 2799 """ 2800 Description: Clears the last dialed number in the call manager. 2801 2802 Usage: 2803 Examples: 2804 hfp_clear_last_dialed 2805 """ 2806 cmd = "Clears the last dialed number in the call manager." 2807 try: 2808 result = self.pri_dut.sl4f.hfp_lib.clearLastDialed() 2809 self.log.info(result) 2810 except Exception as err: 2811 self.log.error(FAILURE.format(cmd, err)) 2812 2813 def do_hfp_set_memory_location(self, line): 2814 """ 2815 Description: Sets a memory location to point to a remote number. 2816 2817 Input(s): 2818 location: The memory location at which to store the number. 2819 number: The number of the remote party to be stored. 2820 2821 Usage: 2822 Examples: 2823 hfp_set_memory_location <location> <number> 2824 hfp_set_memory_location 0 14085555555 2825 """ 2826 cmd = "Sets a memory location to point to a remote number." 2827 try: 2828 info = line.strip().split() 2829 if len(info) != 2: 2830 raise ValueError( 2831 "Exactly two command line arguments required: <location> <number>" 2832 ) 2833 location, number = info[0], info[1] 2834 result = self.pri_dut.sl4f.hfp_lib.setMemoryLocation( 2835 location, number) 2836 self.log.info(result) 2837 except Exception as err: 2838 self.log.error(FAILURE.format(cmd, err)) 2839 2840 def do_hfp_clear_memory_location(self, line): 2841 """ 2842 Description: Sets a memory location to point to a remote number. 2843 2844 Input(s): 2845 localtion: The memory location to clear. 2846 2847 Usage: 2848 Examples: 2849 hfp_clear_memory_location <location> 2850 hfp_clear_memory_location 0 2851 """ 2852 cmd = "Sets a memory location to point to a remote number." 2853 try: 2854 location = line.strip() 2855 result = self.pri_dut.sl4f.hfp_lib.clearMemoryLocation(location) 2856 self.log.info(result) 2857 except Exception as err: 2858 self.log.error(FAILURE.format(cmd, err)) 2859 2860 def do_hfp_set_dial_result(self, line): 2861 """ 2862 Description: Sets the status result to be returned when the number is dialed. 2863 2864 Input(s): 2865 number: The number of the remote party. 2866 status: The status to be returned when an outgoing call is initiated to the number. 2867 2868 Usage: 2869 Examples: 2870 hfp_set_battery_level <value> 2871 """ 2872 cmd = "Sets the status result to be returned when the number is dialed." 2873 try: 2874 info = line.strip().split() 2875 if len(info) != 2: 2876 raise ValueError( 2877 "Exactly two command line arguments required: <number> <status>" 2878 ) 2879 number, status = info[0], int(info[1]) 2880 result = self.pri_dut.sl4f.hfp_lib.setDialResult(number, status) 2881 self.log.info(pprint.pformat(result)) 2882 except Exception as err: 2883 self.log.error(FAILURE.format(cmd, err)) 2884 2885 def do_hfp_get_state(self, line): 2886 """ 2887 Description: Get the call manager's complete state 2888 2889 Usage: 2890 Examples: 2891 hfp_get_state 2892 """ 2893 cmd = "Get the call manager's state" 2894 try: 2895 result = self.pri_dut.sl4f.hfp_lib.getState() 2896 self.log.info(pprint.pformat(result)) 2897 except Exception as err: 2898 self.log.error(FAILURE.format(cmd, err)) 2899 2900 def do_hfp_set_connection_behavior(self, line): 2901 """ 2902 Description: Set the Service Level Connection (SLC) behavior when a new peer connects. 2903 2904 Input(s): 2905 autoconnect: Enable/Disable autoconnection of SLC. 2906 2907 Usage: 2908 Examples: 2909 hfp_set_connection_behavior <autoconnect> 2910 hfp_set_connection_behavior true 2911 hfp_set_connection_behavior false 2912 """ 2913 cmd = "Set the Service Level Connection (SLC) behavior" 2914 try: 2915 autoconnect = line.strip().lower() == "true" 2916 result = self.pri_dut.sl4f.hfp_lib.setConnectionBehavior( 2917 autoconnect) 2918 self.log.info(result) 2919 except Exception as err: 2920 self.log.error(FAILURE.format(cmd, err)) 2921 2922 """End HFP wrappers""" 2923 """Begin RFCOMM wrappers""" 2924 2925 def do_rfcomm_init(self, line): 2926 """ 2927 Description: Initialize the RFCOMM component services. 2928 2929 Usage: 2930 Examples: 2931 rfcomm_init 2932 """ 2933 cmd = "Initialize RFCOMM proxy" 2934 try: 2935 result = self.pri_dut.sl4f.rfcomm_lib.init() 2936 self.log.info(result) 2937 except Exception as err: 2938 self.log.error(FAILURE.format(cmd, err)) 2939 2940 def do_rfcomm_remove_service(self, line): 2941 """ 2942 Description: Removes the RFCOMM service in use. 2943 2944 Usage: 2945 Examples: 2946 rfcomm_remove_service 2947 """ 2948 cmd = "Remove RFCOMM service" 2949 try: 2950 result = self.pri_dut.sl4f.rfcomm_lib.removeService() 2951 self.log.info(result) 2952 except Exception as err: 2953 self.log.error(FAILURE.format(cmd, err)) 2954 2955 def do_rfcomm_disconnect_session(self, line): 2956 """ 2957 Description: Closes the RFCOMM Session. 2958 2959 Usage: 2960 Examples: 2961 rfcomm_disconnect_session 2962 rfcomm_disconnect_session 2963 """ 2964 cmd = "Disconnect the RFCOMM Session" 2965 try: 2966 result = self.pri_dut.sl4f.rfcomm_lib.disconnectSession( 2967 self.unique_mac_addr_id) 2968 self.log.info(result) 2969 except Exception as err: 2970 self.log.error(FAILURE.format(cmd, err)) 2971 2972 def do_rfcomm_connect_rfcomm_channel(self, line): 2973 """ 2974 Description: Make an outgoing RFCOMM connection. 2975 2976 Usage: 2977 Examples: 2978 rfcomm_connect_rfcomm_channel <server_channel_number> 2979 rfcomm_connect_rfcomm_channel 2 2980 """ 2981 cmd = "Make an outgoing RFCOMM connection" 2982 try: 2983 server_channel_number = int(line.strip()) 2984 result = self.pri_dut.sl4f.rfcomm_lib.connectRfcommChannel( 2985 self.unique_mac_addr_id, server_channel_number) 2986 self.log.info(result) 2987 except Exception as err: 2988 self.log.error(FAILURE.format(cmd, err)) 2989 2990 def do_rfcomm_disconnect_rfcomm_channel(self, line): 2991 """ 2992 Description: Close the RFCOMM connection with the peer 2993 2994 Usage: 2995 Examples: 2996 rfcomm_disconnect_rfcomm_channel <server_channel_number> 2997 rfcomm_disconnect_rfcomm_channel 2 2998 """ 2999 cmd = "Close the RFCOMM channel" 3000 try: 3001 server_channel_number = int(line.strip()) 3002 result = self.pri_dut.sl4f.rfcomm_lib.disconnectRfcommChannel( 3003 self.unique_mac_addr_id, server_channel_number) 3004 self.log.info(result) 3005 except Exception as err: 3006 self.log.error(FAILURE.format(cmd, err)) 3007 3008 def do_rfcomm_send_remote_line_status(self, line): 3009 """ 3010 Description: Send a remote line status for the RFCOMM channel. 3011 3012 Usage: 3013 Examples: 3014 rfcomm_send_remote_line_status <server_channel_number> 3015 rfcomm_send_remote_line_status 2 3016 """ 3017 cmd = "Send a remote line status update for the RFCOMM channel" 3018 try: 3019 server_channel_number = int(line.strip()) 3020 result = self.pri_dut.sl4f.rfcomm_lib.sendRemoteLineStatus( 3021 self.unique_mac_addr_id, server_channel_number) 3022 self.log.info(result) 3023 except Exception as err: 3024 self.log.error(FAILURE.format(cmd, err)) 3025 3026 def do_rfcomm_write_rfcomm(self, line): 3027 """ 3028 Description: Send data over the RFCOMM channel. 3029 3030 Usage: 3031 Examples: 3032 rfcomm_write_rfcomm <server_channel_number> <data> 3033 rfcomm_write_rfcomm 2 foobar 3034 """ 3035 cmd = "Send data using the RFCOMM channel" 3036 try: 3037 info = line.strip().split() 3038 if len(info) != 2: 3039 raise ValueError( 3040 "Exactly two command line arguments required: <server_channel_number> <data>" 3041 ) 3042 server_channel_number = int(info[0]) 3043 data = info[1] 3044 result = self.pri_dut.sl4f.rfcomm_lib.writeRfcomm( 3045 self.unique_mac_addr_id, server_channel_number, data) 3046 self.log.info(result) 3047 except Exception as err: 3048 self.log.error(FAILURE.format(cmd, err)) 3049 3050 """End RFCOMM wrappers""" 3051