1#!/usr/bin/env python3
2#
3# Copyright (C) 2019 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17Python script for wrappers to various libraries.
18
19Class CmdInput inherts from the cmd library.
20
21Functions that start with "do_" have a method
22signature that doesn't match the actual command
23line command and that is intended. This is so the
24"help" command knows what to display (in this case
25the documentation of the command itself).
26
27For example:
28Looking at the function "do_tool_set_target_device_name"
29has the inputs self and line which is expected of this type
30of method signature. When the "help" command is done on the
31method name you get the function documentation as such:
32
33(Cmd) help tool_set_target_device_name
34
35        Description: Reset the target device name.
36        Input(s):
37            device_name: Required. The advertising name to connect to.
38        Usage: tool_set_target_device_name new_target_device name
39          Examples:
40            tool_set_target_device_name le_watch
41
42This is all to say this documentation pattern is expected.
43
44"""
45
46from acts_contrib.test_utils.audio_analysis_lib.check_quality import quality_analysis
47from acts_contrib.test_utils.bt.bt_constants import audio_bits_per_sample_32
48from acts_contrib.test_utils.bt.bt_constants import audio_sample_rate_48000
49from acts_contrib.test_utils.abstract_devices.bluetooth_device import create_bluetooth_device
50from acts_contrib.test_utils.bt.bt_constants import bt_attribute_values
51from acts_contrib.test_utils.bt.bt_constants import sig_appearance_constants
52from acts_contrib.test_utils.bt.bt_constants import sig_uuid_constants
53from acts_contrib.test_utils.fuchsia.sdp_records import sdp_pts_record_list
54
55import acts_contrib.test_utils.bt.gatt_test_database as gatt_test_database
56
57import cmd
58import pprint
59import time
60"""Various Global Strings"""
61BASE_UUID = sig_uuid_constants['BASE_UUID']
62CMD_LOG = "CMD {} result: {}"
63FAILURE = "CMD {} threw exception: {}"
64BASIC_ADV_NAME = "fs_test"
65
66
67class CommandInput(cmd.Cmd):
68    ble_adv_interval = 1000
69    ble_adv_appearance = None
70    ble_adv_data_include_tx_power_level = False
71    ble_adv_include_name = True
72    ble_adv_include_scan_response = False
73    ble_adv_name = "fs_test"
74    ble_adv_data_manufacturer_data = None
75    ble_adv_data_service_data = None
76    ble_adv_data_service_uuid_list = None
77    ble_adv_data_uris = None
78
79    bt_control_ids = []
80    bt_control_names = []
81    bt_control_devices = []
82    bt_scan_poll_timer = 0.5
83    target_device_name = ""
84    le_ids = []
85    unique_mac_addr_id = None
86
87    def setup_vars(self, dut, target_device_name, log):
88        self.pri_dut = dut
89        # Note: test_dut is the start of a slow conversion from a Fuchsia specific
90        # Tool to an abstract_device tool. Only commands that use test_dut will work
91        # Otherwise this tool is primarially targeted at Fuchsia devices.
92        self.test_dut = create_bluetooth_device(self.pri_dut)
93        self.test_dut.initialize_bluetooth_controller()
94        self.target_device_name = target_device_name
95        self.log = log
96
97    def emptyline(self):
98        pass
99
100    def do_EOF(self, line):
101        "End Script"
102        return True
103
104    """ Useful Helper functions and cmd line tooling """
105
106    def str_to_bool(self, s):
107        if s.lower() == 'true':
108            return True
109        elif s.lower() == 'false':
110            return False
111
112    def _find_unique_id_over_le(self):
113        scan_filter = {"name_substring": self.target_device_name}
114        self.unique_mac_addr_id = None
115        self.pri_dut.sl4f.gattc_lib.bleStartBleScan(scan_filter)
116        tries = 10
117        for i in range(tries):
118            time.sleep(self.bt_scan_poll_timer)
119            scan_res = self.pri_dut.sl4f.gattc_lib.bleGetDiscoveredDevices(
120            )['result']
121            for device in scan_res:
122                name, did, connectable = device["name"], device["id"], device[
123                    "connectable"]
124                if (self.target_device_name in name):
125                    self.unique_mac_addr_id = did
126                    self.log.info(
127                        "Successfully found device: name, id: {}, {}".format(
128                            name, did))
129                    break
130            if self.unique_mac_addr_id:
131                break
132        self.pri_dut.sl4f.gattc_lib.bleStopBleScan()
133
134    def _find_unique_id_over_bt_control(self):
135        self.unique_mac_addr_id = None
136        self.bt_control_devices = []
137        self.pri_dut.sl4f.bts_lib.requestDiscovery(True)
138        tries = 10
139        for i in range(tries):
140            if self.unique_mac_addr_id:
141                break
142            time.sleep(self.bt_scan_poll_timer)
143            device_list = self.pri_dut.sl4f.bts_lib.getKnownRemoteDevices(
144            )['result']
145            for id_dict in device_list:
146                device = device_list[id_dict]
147                self.bt_control_devices.append(device)
148                name = None
149                if device['name'] is not None:
150                    name = device['name']
151                did, address = device['id'], device['address']
152
153                self.bt_control_ids.append(did)
154                if name is not None:
155                    self.bt_control_names.append(name)
156                    if self.target_device_name in name:
157                        self.unique_mac_addr_id = did
158                        self.log.info(
159                            "Successfully found device: name, id, address: {}, {}, {}"
160                            .format(name, did, address))
161                        break
162        self.pri_dut.sl4f.bts_lib.requestDiscovery(False)
163
164    def do_tool_take_bt_snoop_log(self, custom_name):
165        """
166        Description: Takes the bt snoop log from the Fuchsia device.
167        Logs will show up in your config files' logpath directory.
168
169        Input(s):
170            custom_name: Optional. Override the default pcap file name.
171
172        Usage: tool_set_target_device_name new_target_device name
173          Examples:
174            tool_take_bt_snoop_log connection_error
175            tool_take_bt_snoop_log
176        """
177        self.pri_dut.take_bt_snoop_log(custom_name)
178
179    def do_tool_refresh_unique_id(self, line):
180        """
181        Description: Refresh command line tool mac unique id.
182        Usage:
183          Examples:
184            tool_refresh_unique_id
185        """
186        try:
187            self._find_unique_id_over_le()
188        except Exception as err:
189            self.log.error(
190                "Failed to scan or find scan result: {}".format(err))
191
192    def do_tool_refresh_unique_id_using_bt_control(self, line):
193        """
194        Description: Refresh command line tool mac unique id.
195        Usage:
196          Examples:
197            tool_refresh_unique_id_using_bt_control
198        """
199        try:
200            self._find_unique_id_over_bt_control()
201        except Exception as err:
202            self.log.error(
203                "Failed to scan or find scan result: {}".format(err))
204
205    def do_tool_set_target_device_name(self, line):
206        """
207        Description: Reset the target device name.
208        Input(s):
209            device_name: Required. The advertising name to connect to.
210        Usage: tool_set_target_device_name new_target_device name
211          Examples:
212            tool_set_target_device_name le_watch
213        """
214        self.log.info("Setting target_device_name to: {}".format(line))
215        self.target_device_name = line
216
217    def do_tool_set_unique_mac_addr_id(self, line):
218        """
219        Description: Sets the unique mac address id (Specific to Fuchsia)
220        Input(s):
221            device_id: Required. The id to set the unique mac address id to
222        Usage: tool_set_unique_mac_addr_id device_id
223          Examples:
224            tool_set_unique_mac_addr_id 7fb2cae53aad9e0d
225        """
226        self.unique_mac_addr_id = line
227
228    """Begin BLE advertise wrappers"""
229
230    def complete_ble_adv_data_include_name(self, text, line, begidx, endidx):
231        roles = ["true", "false"]
232        if not text:
233            completions = roles
234        else:
235            completions = [s for s in roles if s.startswith(text)]
236        return completions
237
238    def do_ble_adv_data_include_name(self, line):
239        cmd = "Include name in the advertisement."
240        try:
241            self.ble_adv_include_name = self.str_to_bool(line)
242        except Exception as err:
243            self.log.error(FAILURE.format(cmd, err))
244
245    def do_ble_adv_data_set_name(self, line):
246        cmd = "Set the name to be included in the advertisement."
247        try:
248            self.ble_adv_name = line
249        except Exception as err:
250            self.log.error(FAILURE.format(cmd, err))
251
252    def complete_ble_adv_data_set_appearance(self, text, line, begidx, endidx):
253        if not text:
254            completions = list(sig_appearance_constants.keys())
255        else:
256            completions = [
257                s for s in sig_appearance_constants.keys()
258                if s.startswith(text)
259            ]
260        return completions
261
262    def do_ble_adv_data_set_appearance(self, line):
263        cmd = "Set the appearance to known SIG values."
264        try:
265            self.ble_adv_appearance = line
266        except Exception as err:
267            self.log.error(FAILURE.format(cmd, err))
268
269    def complete_ble_adv_data_include_tx_power_level(self, text, line, begidx,
270                                                     endidx):
271        options = ['true', 'false']
272        if not text:
273            completions = list(options)[:]
274        else:
275            completions = [s for s in options if s.startswith(text)]
276        return completions
277
278    def do_ble_adv_data_include_tx_power_level(self, line):
279        """Include the tx_power_level in the advertising data.
280        Description: Adds tx_power_level to the advertisement data to the BLE
281            advertisement.
282        Input(s):
283            value: Required. True or False
284        Usage: ble_adv_data_include_tx_power_level bool_value
285          Examples:
286            ble_adv_data_include_tx_power_level true
287            ble_adv_data_include_tx_power_level false
288        """
289        cmd = "Include tx_power_level in advertisement."
290        try:
291            self.ble_adv_data_include_tx_power_level = self.str_to_bool(line)
292        except Exception as err:
293            self.log.info(FAILURE.format(cmd, err))
294
295    def complete_ble_adv_include_scan_response(self, text, line, begidx,
296                                               endidx):
297        options = ['true', 'false']
298        if not text:
299            completions = list(options)[:]
300        else:
301            completions = [s for s in options if s.startswith(text)]
302        return completions
303
304    def do_ble_adv_include_scan_response(self, line):
305        """Include scan response in advertisement. inputs: [true|false]
306            Note: Currently just sets the scan response data to the
307                Advertisement data.
308        """
309        cmd = "Include tx_power_level in advertisement."
310        try:
311            self.ble_adv_include_scan_response = self.str_to_bool(line)
312        except Exception as err:
313            self.log.info(FAILURE.format(cmd, err))
314
315    def do_ble_adv_data_add_manufacturer_data(self, line):
316        """Include manufacturer id and data to the advertisment
317        Description: Adds manufacturer data to the BLE advertisement.
318        Input(s):
319            id: Required. The int representing the manufacturer id.
320            data: Required. The string representing the data.
321        Usage: ble_adv_data_add_manufacturer_data id data
322          Examples:
323            ble_adv_data_add_manufacturer_data 1 test
324        """
325        cmd = "Include manufacturer id and data to the advertisment."
326        try:
327
328            info = line.split()
329            if self.ble_adv_data_manufacturer_data is None:
330                self.ble_adv_data_manufacturer_data = []
331            self.ble_adv_data_manufacturer_data.append({
332                "id": int(info[0]),
333                "data": info[1]
334            })
335        except Exception as err:
336            self.log.info(FAILURE.format(cmd, err))
337
338    def do_ble_adv_data_add_service_data(self, line):
339        """Include service data to the advertisment
340        Description: Adds service data to the BLE advertisement.
341        Input(s):
342            uuid: Required. The string representing the uuid.
343            data: Required. The string representing the data.
344        Usage: ble_adv_data_add_service_data uuid data
345          Examples:
346            ble_adv_data_add_service_data 00001801-0000-1000-8000-00805f9b34fb test
347        """
348        cmd = "Include manufacturer id and data to the advertisment."
349        try:
350            info = line.split()
351            if self.ble_adv_data_service_data is None:
352                self.ble_adv_data_service_data = []
353            self.ble_adv_data_service_data.append({
354                "uuid": info[0],
355                "data": info[1]
356            })
357        except Exception as err:
358            self.log.info(FAILURE.format(cmd, err))
359
360    def do_ble_adv_add_service_uuid_list(self, line):
361        """Include a list of service uuids to the advertisment:
362        Description: Adds service uuid list to the BLE advertisement.
363        Input(s):
364            uuid: Required. A list of N string UUIDs to add.
365        Usage: ble_adv_add_service_uuid_list uuid0 uuid1 ... uuidN
366          Examples:
367            ble_adv_add_service_uuid_list 00001801-0000-1000-8000-00805f9b34fb
368            ble_adv_add_service_uuid_list 00001801-0000-1000-8000-00805f9b34fb 00001802-0000-1000-8000-00805f9b34fb
369        """
370        cmd = "Include service uuid list to the advertisment data."
371        try:
372            self.ble_adv_data_service_uuid_list = line
373        except Exception as err:
374            self.log.info(FAILURE.format(cmd, err))
375
376    def do_ble_adv_data_set_uris(self, uris):
377        """Set the URIs of the LE advertisement data:
378        Description: Adds list of String UIRs
379          See (RFC 3986 1.1.2 https://tools.ietf.org/html/rfc3986)
380          Valid URI examples:
381            ftp://ftp.is.co.za/rfc/rfc1808.txt
382            http://www.ietf.org/rfc/rfc2396.txt
383            ldap://[2001:db8::7]/c=GB?objectClass?one
384            mailto:John.Doe@example.com
385            news:comp.infosystems.www.servers.unix
386            tel:+1-816-555-1212
387            telnet://192.0.2.16:80/
388            urn:oasis:names:specification:docbook:dtd:xml:4.1.2
389        Input(s):
390            uris: Required. A list of URIs to add.
391        Usage: ble_adv_data_set_uris uri0 uri1 ... uriN
392          Examples:
393            ble_adv_data_set_uris telnet://192.0.2.16:80/
394            ble_adv_data_set_uris tel:+1-816-555-1212
395        """
396        cmd = "Set the appearance to known SIG values."
397        try:
398            self.ble_adv_data_uris = uris.split()
399        except Exception as err:
400            self.log.error(FAILURE.format(cmd, err))
401
402    def start_advertisement(self, connectable):
403        """ Handle setting advertising data and the advertisement
404            Note: After advertisement is successful, clears values set for
405                * Manufacturer data
406                * Appearance information
407                * Scan Response
408                * Service UUIDs
409                * URI list
410            Args:
411                connectable: Bool of whether to start a connectable
412                    advertisement or not.
413        """
414        adv_data_name = self.ble_adv_name
415        if not self.ble_adv_include_name:
416            adv_data_name = None
417
418        manufacturer_data = self.ble_adv_data_manufacturer_data
419
420        tx_power_level = None
421        if self.ble_adv_data_include_tx_power_level:
422            tx_power_level = 1  # Not yet implemented so set to 1
423
424        scan_response = self.ble_adv_include_scan_response
425
426        adv_data = {
427            "name": adv_data_name,
428            "appearance": self.ble_adv_appearance,
429            "service_data": self.ble_adv_data_service_data,
430            "tx_power_level": tx_power_level,
431            "service_uuids": self.ble_adv_data_service_uuid_list,
432            "manufacturer_data": manufacturer_data,
433            "uris": self.ble_adv_data_uris,
434        }
435
436        if not self.ble_adv_include_scan_response:
437            scan_response = None
438        else:
439            scan_response = adv_data
440
441        result = self.pri_dut.sl4f.ble_lib.bleStartBleAdvertising(
442            adv_data, scan_response, self.ble_adv_interval, connectable)
443        self.log.info("Result of starting advertisement: {}".format(result))
444        self.ble_adv_data_manufacturer_data = None
445        self.ble_adv_appearance = None
446        self.ble_adv_include_scan_response = False
447        self.ble_adv_data_service_uuid_list = None
448        self.ble_adv_data_uris = None
449        self.ble_adv_data_service_data = None
450
451    def do_ble_start_generic_connectable_advertisement(self, line):
452        """
453        Description: Start a connectable LE advertisement
454
455        Usage: ble_start_generic_connectable_advertisement
456        """
457        cmd = "Start a connectable LE advertisement"
458        try:
459            connectable = True
460            self.start_advertisement(connectable)
461        except Exception as err:
462            self.log.error(FAILURE.format(cmd, err))
463
464    def do_ble_start_generic_nonconnectable_advertisement(self, line):
465        """
466        Description: Start a non-connectable LE advertisement
467
468        Usage: ble_start_generic_nonconnectable_advertisement
469        """
470        cmd = "Start a nonconnectable LE advertisement"
471        try:
472            connectable = False
473            self.start_advertisement(connectable)
474        except Exception as err:
475            self.log.error(FAILURE.format(cmd, err))
476
477    def do_ble_stop_advertisement(self, line):
478        """
479        Description: Stop a BLE advertisement.
480        Usage: ble_stop_advertisement
481        """
482        cmd = "Stop a connectable LE advertisement"
483        try:
484            self.pri_dut.sl4f.ble_lib.bleStopBleAdvertising()
485        except Exception as err:
486            self.log.error(FAILURE.format(cmd, err))
487
488    """End BLE advertise wrappers"""
489    """Begin GATT client wrappers"""
490
491    def complete_gattc_connect_by_id(self, text, line, begidx, endidx):
492        if not text:
493            completions = list(self.le_ids)[:]
494        else:
495            completions = [s for s in self.le_ids if s.startswith(text)]
496        return completions
497
498    def do_gattc_connect_by_id(self, line):
499        """
500        Description: Connect to a LE peripheral.
501        Input(s):
502            device_id: Required. The unique device ID from Fuchsia
503                discovered devices.
504        Usage:
505          Examples:
506            gattc_connect device_id
507        """
508        cmd = "Connect to a LE peripheral by input ID."
509        try:
510
511            connection_status = self.pri_dut.sl4f.gattc_lib.bleConnectToPeripheral(
512                line)
513            self.log.info("Connection status: {}".format(
514                pprint.pformat(connection_status)))
515        except Exception as err:
516            self.log.error(FAILURE.format(cmd, err))
517
518    def do_gattc_connect(self, line):
519        """
520        Description: Connect to a LE peripheral.
521        Optional input: device_name
522        Input(s):
523            device_name: Optional. The peripheral ID to connect to.
524        Usage:
525          Examples:
526            gattc_connect
527            gattc_connect eddystone_123
528        """
529        cmd = "Connect to a LE peripheral."
530        try:
531            if len(line) > 0:
532                self.target_device_name = line
533                self.unique_mac_addr_id = None
534            if not self.unique_mac_addr_id:
535                try:
536                    self._find_unique_id()
537                except Exception as err:
538                    self.log.info("Failed to scan or find device.")
539                    return
540            connection_status = self.pri_dut.sl4f.gattc_lib.bleConnectToPeripheral(
541                self.unique_mac_addr_id)
542            self.log.info("Connection status: {}".format(
543                pprint.pformat(connection_status)))
544        except Exception as err:
545            self.log.error(FAILURE.format(cmd, err))
546
547    def do_gattc_connect_disconnect_iterations(self, line):
548        """
549        Description: Connect then disconnect to a LE peripheral multiple times.
550        Input(s):
551            iterations: Required. The number of iterations to run.
552        Usage:
553          Examples:
554            gattc_connect_disconnect_iterations 10
555        """
556        cmd = "Connect to a LE peripheral."
557        try:
558            if not self.unique_mac_addr_id:
559                try:
560                    self._find_unique_id()
561                except Exception as err:
562                    self.log.info("Failed to scan or find device.")
563                    return
564            for i in range(int(line)):
565                self.log.info("Running iteration {}".format(i + 1))
566                connection_status = self.pri_dut.sl4f.gattc_lib.bleConnectToPeripheral(
567                    self.unique_mac_addr_id)
568                self.log.info("Connection status: {}".format(
569                    pprint.pformat(connection_status)))
570                time.sleep(4)
571                disc_status = self.pri_dut.sl4f.gattc_lib.bleDisconnectPeripheral(
572                    self.unique_mac_addr_id)
573                self.log.info("Disconnect status: {}".format(disc_status))
574                time.sleep(3)
575        except Exception as err:
576            self.log.error(FAILURE.format(cmd, err))
577
578    def do_gattc_disconnect(self, line):
579        """
580        Description: Disconnect from LE peripheral.
581        Assumptions: Already connected to a peripheral.
582        Usage:
583          Examples:
584            gattc_disconnect
585        """
586        cmd = "Disconenct from LE peripheral."
587        try:
588            disconnect_status = self.pri_dut.sl4f.gattc_lib.bleDisconnectPeripheral(
589                self.unique_mac_addr_id)
590            self.log.info("Disconnect status: {}".format(disconnect_status))
591        except Exception as err:
592            self.log.error(FAILURE.format(cmd, err))
593
594    def do_gattc_list_services(self, discover_chars):
595        """
596        Description: List services from LE peripheral.
597        Assumptions: Already connected to a peripheral.
598        Input(s):
599            discover_chars: Optional. An optional input to discover all
600                characteristics on the service.
601        Usage:
602          Examples:
603            gattc_list_services
604            gattc_list_services true
605        """
606        cmd = "List services from LE peripheral."
607        try:
608
609            services = self.pri_dut.sl4f.gattc_lib.listServices(
610                self.unique_mac_addr_id)
611            self.log.info("Discovered Services: \n{}".format(
612                pprint.pformat(services)))
613            discover_characteristics = self.str_to_bool(discover_chars)
614            if discover_chars:
615                for service in services.get('result'):
616                    self.pri_dut.sl4f.gattc_lib.connectToService(
617                        self.unique_mac_addr_id, service.get('id'))
618                    chars = self.pri_dut.sl4f.gattc_lib.discoverCharacteristics(
619                    )
620                    self.log.info("Discovered chars:\n{}".format(
621                        pprint.pformat(chars)))
622
623        except Exception as err:
624            self.log.error(FAILURE.format(cmd, err))
625
626    def do_gattc_connect_to_service(self, line):
627        """
628        Description: Connect to Peripheral GATT server service.
629        Assumptions: Already connected to peripheral.
630        Input(s):
631            service_id: Required. The service id reference on the GATT server.
632        Usage:
633          Examples:
634            gattc_connect_to_service service_id
635        """
636        cmd = "GATT client connect to GATT server service."
637        try:
638            self.pri_dut.sl4f.gattc_lib.connectToService(
639                self.unique_mac_addr_id, int(line))
640        except Exception as err:
641            self.log.error(FAILURE.format(cmd, err))
642
643    def do_gattc_discover_characteristics(self, line):
644        """
645        Description: Discover characteristics from a connected service.
646        Assumptions: Already connected to a GATT server service.
647        Usage:
648          Examples:
649            gattc_discover_characteristics
650        """
651        cmd = "Discover and list characteristics from a GATT server."
652        try:
653            chars = self.pri_dut.sl4f.gattc_lib.discoverCharacteristics()
654            self.log.info("Discovered chars:\n{}".format(
655                pprint.pformat(chars)))
656        except Exception as err:
657            self.log.error(FAILURE.format(cmd, err))
658
659    def do_gattc_notify_all_chars(self, line):
660        """
661        Description: Enable all notifications on all Characteristics on
662            a GATT server.
663        Assumptions: Basic GATT connection made.
664        Usage:
665          Examples:
666            gattc_notify_all_chars
667        """
668        cmd = "Read all characteristics from the GATT service."
669        try:
670            services = self.pri_dut.sl4f.gattc_lib.listServices(
671                self.unique_mac_addr_id)
672            for service in services['result']:
673                service_id = service['id']
674                service_uuid = service['uuid_type']
675                self.pri_dut.sl4f.gattc_lib.connectToService(
676                    self.unique_mac_addr_id, service_id)
677                chars = self.pri_dut.sl4f.gattc_lib.discoverCharacteristics()
678                print("Reading chars in service uuid: {}".format(service_uuid))
679
680                for char in chars['result']:
681                    char_id = char['id']
682                    char_uuid = char['uuid_type']
683                    # quick char filter for apple-4 test... remove later
684                    print("found uuid {}".format(char_uuid))
685                    try:
686                        self.pri_dut.sl4f.gattc_lib.enableNotifyCharacteristic(
687                            char_id)
688                    except Exception as err:
689                        print("error enabling notification")
690        except Exception as err:
691            self.log.error(FAILURE.format(cmd, err))
692
693    def do_gattc_read_all_chars(self, line):
694        """
695        Description: Read all Characteristic values from a GATT server across
696            all services.
697        Assumptions: Basic GATT connection made.
698        Usage:
699          Examples:
700            gattc_read_all_chars
701        """
702        cmd = "Read all characteristics from the GATT service."
703        try:
704            services = self.pri_dut.sl4f.gattc_lib.listServices(
705                self.unique_mac_addr_id)
706            for service in services['result']:
707                service_id = service['id']
708                service_uuid = service['uuid_type']
709                self.pri_dut.sl4f.gattc_lib.connectToService(
710                    self.unique_mac_addr_id, service_id)
711                chars = self.pri_dut.sl4f.gattc_lib.discoverCharacteristics()
712                print("Reading chars in service uuid: {}".format(service_uuid))
713
714                for char in chars['result']:
715                    char_id = char['id']
716                    char_uuid = char['uuid_type']
717                    try:
718                        read_val =  \
719                            self.pri_dut.sl4f.gattc_lib.readCharacteristicById(
720                                char_id)
721                        print("  Characteristic uuid / Value: {} / {}".format(
722                            char_uuid, read_val['result']))
723                        str_value = ""
724                        for val in read_val['result']:
725                            str_value += chr(val)
726                        print("    str val: {}".format(str_value))
727                    except Exception as err:
728                        print(err)
729        except Exception as err:
730            self.log.error(FAILURE.format(cmd, err))
731
732    def do_gattc_read_all_desc(self, line):
733        """
734        Description: Read all Descriptors values from a GATT server across
735            all services.
736        Assumptions: Basic GATT connection made.
737        Usage:
738          Examples:
739            gattc_read_all_chars
740        """
741        cmd = "Read all descriptors from the GATT service."
742        try:
743            services = self.pri_dut.sl4f.gattc_lib.listServices(
744                self.unique_mac_addr_id)
745            for service in services['result']:
746                service_id = service['id']
747                service_uuid = service['uuid_type']
748                self.pri_dut.sl4f.gattc_lib.connectToService(
749                    self.unique_mac_addr_id, service_id)
750                chars = self.pri_dut.sl4f.gattc_lib.discoverCharacteristics()
751                print("Reading descs in service uuid: {}".format(service_uuid))
752
753                for char in chars['result']:
754                    char_id = char['id']
755                    char_uuid = char['uuid_type']
756                    descriptors = char['descriptors']
757                    print("  Reading descs in char uuid: {}".format(char_uuid))
758                    for desc in descriptors:
759                        desc_id = desc["id"]
760                        desc_uuid = desc["uuid_type"]
761                    try:
762                        read_val = self.pri_dut.sl4f.gattc_lib.readDescriptorById(
763                            desc_id)
764                        print("    Descriptor uuid / Value: {} / {}".format(
765                            desc_uuid, read_val['result']))
766                    except Exception as err:
767                        pass
768        except Exception as err:
769            self.log.error(FAILURE.format(cmd, err))
770
771    def do_gattc_write_all_desc(self, line):
772        """
773        Description: Write a value to all Descriptors on the GATT server.
774        Assumptions: Basic GATT connection made.
775        Input(s):
776            offset: Required. The offset to start writing to.
777            size: Required. The size of bytes to write (value will be generated).
778                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
779        Usage:
780          Examples:
781            gattc_write_all_desc 0 100
782            gattc_write_all_desc 10 2
783        """
784        cmd = "Read all descriptors from the GATT service."
785        try:
786            args = line.split()
787            if len(args) != 2:
788                self.log.info("2 Arguments required: [Offset] [Size]")
789                return
790            offset = int(args[0])
791            size = args[1]
792            write_value = []
793            for i in range(int(size)):
794                write_value.append(i % 256)
795            services = self.pri_dut.sl4f.gattc_lib.listServices(
796                self.unique_mac_addr_id)
797            for service in services['result']:
798                service_id = service['id']
799                service_uuid = service['uuid_type']
800                self.pri_dut.sl4f.gattc_lib.connectToService(
801                    self.unique_mac_addr_id, service_id)
802                chars = self.pri_dut.sl4f.gattc_lib.discoverCharacteristics()
803                print("Writing descs in service uuid: {}".format(service_uuid))
804
805                for char in chars['result']:
806                    char_id = char['id']
807                    char_uuid = char['uuid_type']
808                    descriptors = char['descriptors']
809                    print("  Reading descs in char uuid: {}".format(char_uuid))
810                    for desc in descriptors:
811                        desc_id = desc["id"]
812                        desc_uuid = desc["uuid_type"]
813                    try:
814                        write_val = self.pri_dut.sl4f.gattc_lib.writeDescriptorById(
815                            desc_id, offset, write_value)
816                        print("    Descriptor uuid / Result: {} / {}".format(
817                            desc_uuid, write_val['result']))
818                    except Exception as err:
819                        pass
820        except Exception as err:
821            self.log.error(FAILURE.format(cmd, err))
822
823    def do_gattc_read_all_long_desc(self, line):
824        """
825        Description: Read all long Characteristic Descriptors
826        Assumptions: Basic GATT connection made.
827        Input(s):
828            offset: Required. The offset to start reading from.
829            max_bytes: Required. The max size of bytes to return.
830        Usage:
831          Examples:
832            gattc_read_all_long_desc 0 100
833            gattc_read_all_long_desc 10 20
834        """
835        cmd = "Read all long descriptors from the GATT service."
836        try:
837            args = line.split()
838            if len(args) != 2:
839                self.log.info("2 Arguments required: [Offset] [Size]")
840                return
841            offset = int(args[0])
842            max_bytes = int(args[1])
843            services = self.pri_dut.sl4f.ble_lib.bleListServices(
844                self.unique_mac_addr_id)
845            for service in services['result']:
846                service_id = service['id']
847                service_uuid = service['uuid_type']
848                self.pri_dut.sl4f.gattc_lib.connectToService(
849                    self.unique_mac_addr_id, service_id)
850                chars = self.pri_dut.sl4f.gattc_lib.discoverCharacteristics()
851                print("Reading descs in service uuid: {}".format(service_uuid))
852
853                for char in chars['result']:
854                    char_id = char['id']
855                    char_uuid = char['uuid_type']
856                    descriptors = char['descriptors']
857                    print("  Reading descs in char uuid: {}".format(char_uuid))
858                    for desc in descriptors:
859                        desc_id = desc["id"]
860                        desc_uuid = desc["uuid_type"]
861                    try:
862                        read_val = self.pri_dut.sl4f.gattc_lib.readLongDescriptorById(
863                            desc_id, offset, max_bytes)
864                        print("    Descriptor uuid / Result: {} / {}".format(
865                            desc_uuid, read_val['result']))
866                    except Exception as err:
867                        pass
868        except Exception as err:
869            self.log.error(FAILURE.format(cmd, err))
870
871    def do_gattc_read_all_long_char(self, line):
872        """
873        Description: Read all long Characteristic
874        Assumptions: Basic GATT connection made.
875        Input(s):
876            offset: Required. The offset to start reading from.
877            max_bytes: Required. The max size of bytes to return.
878        Usage:
879          Examples:
880            gattc_read_all_long_char 0 100
881            gattc_read_all_long_char 10 20
882        """
883        cmd = "Read all long Characteristics from the GATT service."
884        try:
885            args = line.split()
886            if len(args) != 2:
887                self.log.info("2 Arguments required: [Offset] [Size]")
888                return
889            offset = int(args[0])
890            max_bytes = int(args[1])
891            services = self.pri_dut.sl4f.ble_lib.bleListServices(
892                self.unique_mac_addr_id)
893            for service in services['result']:
894                service_id = service['id']
895                service_uuid = service['uuid_type']
896                self.pri_dut.sl4f.gattc_lib.connectToService(
897                    self.unique_mac_addr_id, service_id)
898                chars = self.pri_dut.sl4f.gattc_lib.discoverCharacteristics()
899                print("Reading chars in service uuid: {}".format(service_uuid))
900
901                for char in chars['result']:
902                    char_id = char['id']
903                    char_uuid = char['uuid_type']
904                    try:
905                        read_val = self.pri_dut.sl4f.gattc_lib.readLongCharacteristicById(
906                            char_id, offset, max_bytes)
907                        print("    Char uuid / Result: {} / {}".format(
908                            char_uuid, read_val['result']))
909                    except Exception as err:
910                        pass
911        except Exception as err:
912            self.log.error(FAILURE.format(cmd, err))
913
914    def do_gattc_write_all_chars(self, line):
915        """
916        Description: Write all characteristic values from a GATT server across
917            all services.
918        Assumptions: Basic GATT connection made.
919        Input(s):
920            offset: Required. The offset to start writing on.
921            size: The write value size (value will be generated)
922                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
923        Usage:
924          Examples:
925            gattc_write_all_chars 0 10
926            gattc_write_all_chars 10 1
927        """
928        cmd = "Read all characteristics from the GATT service."
929        try:
930            args = line.split()
931            if len(args) != 2:
932                self.log.info("2 Arguments required: [Offset] [Size]")
933                return
934            offset = int(args[0])
935            size = int(args[1])
936            write_value = []
937            for i in range(size):
938                write_value.append(i % 256)
939            services = self.pri_dut.sl4f.gattc_lib.listServices(
940                self.unique_mac_addr_id)
941            for service in services['result']:
942                service_id = service['id']
943                service_uuid = service['uuid_type']
944                self.pri_dut.sl4f.gattc_lib.connectToService(
945                    self.unique_mac_addr_id, service_id)
946                chars = self.pri_dut.sl4f.gattc_lib.discoverCharacteristics()
947                print("Writing chars in service uuid: {}".format(service_uuid))
948
949                for char in chars['result']:
950                    char_id = char['id']
951                    char_uuid = char['uuid_type']
952                    try:
953                        write_result = self.pri_dut.sl4f.gattc_lib.writeCharById(
954                            char_id, offset, write_value)
955                        print("  Characteristic uuid write result: {} / {}".
956                              format(char_uuid, write_result['result']))
957                    except Exception as err:
958                        print("error writing char {}".format(err))
959        except Exception as err:
960            self.log.error(FAILURE.format(cmd, err))
961
962    def do_gattc_write_all_chars_without_response(self, line):
963        """
964        Description: Write all characteristic values from a GATT server across
965            all services.
966        Assumptions: Basic GATT connection made.
967        Input(s):
968            size: The write value size (value will be generated).
969                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
970        Usage:
971          Examples:
972            gattc_write_all_chars_without_response 100
973        """
974        cmd = "Read all characteristics from the GATT service."
975        try:
976            args = line.split()
977            if len(args) != 1:
978                self.log.info("1 Arguments required: [Size]")
979                return
980            size = int(args[0])
981            write_value = []
982            for i in range(size):
983                write_value.append(i % 256)
984            services = self.pri_dut.sl4f.gattc_lib.listServices(
985                self.unique_mac_addr_id)
986            for service in services['result']:
987                service_id = service['id']
988                service_uuid = service['uuid_type']
989                self.pri_dut.sl4f.gattc_lib.connectToService(
990                    self.unique_mac_addr_id, service_id)
991                chars = self.pri_dut.sl4f.gattc_lib.discoverCharacteristics()
992                print("Reading chars in service uuid: {}".format(service_uuid))
993
994                for char in chars['result']:
995                    char_id = char['id']
996                    char_uuid = char['uuid_type']
997                    try:
998                        write_result = \
999                            self.pri_dut.sl4f.gattc_lib.writeCharByIdWithoutResponse(
1000                                char_id, write_value)
1001                        print("  Characteristic uuid write result: {} / {}".
1002                              format(char_uuid, write_result['result']))
1003                    except Exception as err:
1004                        pass
1005        except Exception as err:
1006            self.log.error(FAILURE.format(cmd, err))
1007
1008    def do_gattc_write_char_by_id(self, line):
1009        """
1010        Description: Write char by characteristic id reference.
1011        Assumptions: Already connected to a GATT server service.
1012        Input(s):
1013            characteristic_id: The characteristic id reference on the GATT
1014                service
1015            offset: The offset value to use
1016            size: Function will generate random bytes by input size.
1017                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
1018        Usage:
1019          Examples:
1020            gattc_write_char_by_id char_id 0 5
1021            gattc_write_char_by_id char_id 20 1
1022        """
1023        cmd = "Write to GATT server characteristic ."
1024        try:
1025            args = line.split()
1026            if len(args) != 3:
1027                self.log.info("3 Arguments required: [Id] [Offset] [Size]")
1028                return
1029            id = int(args[0], 16)
1030            offset = int(args[1])
1031            size = int(args[2])
1032            write_value = []
1033            for i in range(size):
1034                write_value.append(i % 256)
1035            self.test_dut.gatt_client_write_characteristic_by_handle(
1036                self.unique_mac_addr_id, id, offset, write_value)
1037        except Exception as err:
1038            self.log.error(FAILURE.format(cmd, err))
1039
1040    def do_gattc_write_long_char_by_id(self, line):
1041        """
1042        Description: Write long char by characteristic id reference.
1043        Assumptions: Already connected to a GATT server service.
1044        Input(s):
1045            characteristic_id: The characteristic id reference on the GATT
1046                service
1047            offset: The offset value to use
1048            size: Function will generate random bytes by input size.
1049                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
1050            reliable_mode: Optional: Reliable writes represented as bool
1051        Usage:
1052          Examples:
1053            gattc_write_long_char_by_id char_id 0 5
1054            gattc_write_long_char_by_id char_id 20 1
1055            gattc_write_long_char_by_id char_id 20 1 true
1056            gattc_write_long_char_by_id char_id 20 1 false
1057        """
1058        cmd = "Long Write to GATT server characteristic ."
1059        try:
1060            args = line.split()
1061            if len(args) < 3:
1062                self.log.info("3 Arguments required: [Id] [Offset] [Size]")
1063                return
1064            id = int(args[0], 16)
1065            offset = int(args[1])
1066            size = int(args[2])
1067            reliable_mode = False
1068            if len(args) > 3:
1069                reliable_mode = self.str_to_bool(args[3])
1070            write_value = []
1071            for i in range(size):
1072                write_value.append(i % 256)
1073            self.test_dut.gatt_client_write_long_characteristic_by_handle(
1074                self.unique_mac_addr_id, id, offset, write_value,
1075                reliable_mode)
1076        except Exception as err:
1077            self.log.error(FAILURE.format(cmd, err))
1078
1079    def do_gattc_write_long_desc_by_id(self, line):
1080        """
1081        Description: Write long char by descrioptor id reference.
1082        Assumptions: Already connected to a GATT server service.
1083        Input(s):
1084            characteristic_id: The characteristic id reference on the GATT
1085                service
1086            offset: The offset value to use
1087            size: Function will generate random bytes by input size.
1088                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
1089        Usage:
1090          Examples:
1091            gattc_write_long_desc_by_id char_id 0 5
1092            gattc_write_long_desc_by_id char_id 20 1
1093        """
1094        cmd = "Long Write to GATT server descriptor ."
1095        try:
1096            args = line.split()
1097            if len(args) != 3:
1098                self.log.info("3 Arguments required: [Id] [Offset] [Size]")
1099                return
1100            id = int(args[0], 16)
1101            offset = int(args[1])
1102            size = int(args[2])
1103            write_value = []
1104            for i in range(size):
1105                write_value.append(i % 256)
1106            self.test_dut.gatt_client_write_long_descriptor_by_handle(
1107                self.unique_mac_addr_id, id, offset, write_value)
1108        except Exception as err:
1109            self.log.error(FAILURE.format(cmd, err))
1110
1111    def do_gattc_write_char_by_id_without_response(self, line):
1112        """
1113        Description: Write char by characteristic id reference without response.
1114        Assumptions: Already connected to a GATT server service.
1115        Input(s):
1116            characteristic_id: The characteristic id reference on the GATT
1117                service
1118            size: Function will generate random bytes by input size.
1119                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
1120        Usage:
1121          Examples:
1122            gattc_write_char_by_id_without_response char_id 5
1123        """
1124        cmd = "Write characteristic by id without response."
1125        try:
1126            args = line.split()
1127            if len(args) != 2:
1128                self.log.info("2 Arguments required: [Id] [Size]")
1129                return
1130            id = int(args[0], 16)
1131            size = args[1]
1132            write_value = []
1133            for i in range(int(size)):
1134                write_value.append(i % 256)
1135            self.test_dut.gatt_client_write_characteristic_without_response_by_handle(
1136                self.unique_mac_addr_id, id, write_value)
1137        except Exception as err:
1138            self.log.error(FAILURE.format(cmd, err))
1139
1140    def do_gattc_enable_notify_char_by_id(self, line):
1141        """
1142        Description: Enable Characteristic notification on Characteristic ID.
1143        Assumptions: Already connected to a GATT server service.
1144        Input(s):
1145            characteristic_id: The characteristic id reference on the GATT
1146                service
1147        Usage:
1148          Examples:
1149            gattc_enable_notify_char_by_id char_id
1150        """
1151        cmd = "Enable notifications by Characteristic id."
1152        try:
1153            id = int(line, 16)
1154            self.test_dut.gatt_client_enable_notifiy_characteristic_by_handle(
1155                self.unique_mac_addr_id, id)
1156        except Exception as err:
1157            self.log.error(FAILURE.format(cmd, err))
1158
1159    def do_gattc_disable_notify_char_by_id(self, line):
1160        """
1161        Description: Disable Characteristic notification on Characteristic ID.
1162        Assumptions: Already connected to a GATT server service.
1163        Input(s):
1164            characteristic_id: The characteristic id reference on the GATT
1165                service
1166        Usage:
1167          Examples:
1168            gattc_disable_notify_char_by_id char_id
1169        """
1170        cmd = "Disable notify Characteristic by id."
1171        try:
1172            id = int(line, 16)
1173            self.test_dut.gatt_client_disable_notifiy_characteristic_by_handle(
1174                self.unique_mac_addr_id, id)
1175        except Exception as err:
1176            self.log.error(FAILURE.format(cmd, err))
1177
1178    def do_gattc_read_char_by_id(self, line):
1179        """
1180        Description: Read Characteristic by ID.
1181        Assumptions: Already connected to a GATT server service.
1182        Input(s):
1183            characteristic_id: The characteristic id reference on the GATT
1184                service
1185        Usage:
1186          Examples:
1187            gattc_read_char_by_id char_id
1188        """
1189        cmd = "Read Characteristic value by ID."
1190        try:
1191            id = int(line, 16)
1192            read_val = self.test_dut.gatt_client_read_characteristic_by_handle(
1193                self.unique_mac_addr_id, id)
1194            self.log.info("Characteristic Value with id {}: {}".format(
1195                id, read_val))
1196        except Exception as err:
1197            self.log.error(FAILURE.format(cmd, err))
1198
1199    def do_gattc_read_char_by_uuid(self, characteristic_uuid):
1200        """
1201        Description: Read Characteristic by UUID (read by type).
1202        Assumptions: Already connected to a GATT server service.
1203        Input(s):
1204            characteristic_uuid: The characteristic id reference on the GATT
1205                service
1206        Usage:
1207          Examples:
1208            gattc_read_char_by_id char_id
1209        """
1210        cmd = "Read Characteristic value by ID."
1211        try:
1212            short_uuid_len = 4
1213            if len(characteristic_uuid) == short_uuid_len:
1214                characteristic_uuid = BASE_UUID.format(characteristic_uuid)
1215
1216            read_val = self.test_dut.gatt_client_read_characteristic_by_uuid(
1217                self.unique_mac_addr_id, characteristic_uuid)
1218            self.log.info("Characteristic Value with id {}: {}".format(
1219                id, read_val))
1220        except Exception as err:
1221            self.log.error(FAILURE.format(cmd, err))
1222
1223    def do_gattc_write_desc_by_id(self, line):
1224        """
1225        Description: Write Descriptor by characteristic id reference.
1226        Assumptions: Already connected to a GATT server service.
1227        Input(s):
1228            descriptor_id: The Descriptor id reference on the GATT service
1229            offset: The offset value to use
1230            size: Function will generate random bytes by input size.
1231                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
1232        Usage:
1233          Examples:
1234            gattc_write_desc_by_id desc_id 0 5
1235            gattc_write_desc_by_id desc_id 20 1
1236        """
1237        cmd = "Write Descriptor by id."
1238        try:
1239            args = line.split()
1240            id = int(args[0], 16)
1241            offset = int(args[1])
1242            size = args[2]
1243            write_value = []
1244            for i in range(int(size)):
1245                write_value.append(i % 256)
1246            write_result = self.test_dut.gatt_client_write_descriptor_by_handle(
1247                self.unique_mac_addr_id, id, offset, write_value)
1248            self.log.info("Descriptor Write result {}: {}".format(
1249                id, write_result))
1250        except Exception as err:
1251            self.log.error(FAILURE.format(cmd, err))
1252
1253    def do_gattc_read_desc_by_id(self, line):
1254        """
1255        Description: Read Descriptor by ID.
1256        Assumptions: Already connected to a GATT server service.
1257        Input(s):
1258            descriptor_id: The Descriptor id reference on the GATT service
1259        Usage:
1260          Examples:
1261            gattc_read_desc_by_id desc_id
1262        """
1263        cmd = "Read Descriptor by ID."
1264        try:
1265            id = int(line, 16)
1266            read_val = self.test_dut.gatt_client_read_descriptor_by_handle(
1267                self.unique_mac_addr_id, id)
1268            self.log.info("Descriptor Value with id {}: {}".format(
1269                id, read_val))
1270        except Exception as err:
1271            self.log.error(FAILURE.format(cmd, err))
1272
1273    def do_gattc_read_long_char_by_id(self, line):
1274        """
1275        Description: Read long Characteristic value by id.
1276        Assumptions: Already connected to a GATT server service.
1277        Input(s):
1278            characteristic_id: The characteristic id reference on the GATT
1279                service
1280            offset: The offset value to use.
1281            max_bytes: The max bytes size to return.
1282        Usage:
1283          Examples:
1284            gattc_read_long_char_by_id char_id 0 10
1285            gattc_read_long_char_by_id char_id 20 1
1286        """
1287        cmd = "Read long Characteristic value by id."
1288        try:
1289            args = line.split()
1290            if len(args) != 3:
1291                self.log.info("3 Arguments required: [Id] [Offset] [Size]")
1292                return
1293            id = int(args[0], 16)
1294            offset = int(args[1])
1295            max_bytes = int(args[2])
1296            read_val = self.test_dut.gatt_client_read_long_characteristic_by_handle(
1297                self.unique_mac_addr_id, id, offset, max_bytes)
1298            self.log.info("Characteristic Value with id {}: {}".format(
1299                id, read_val['result']))
1300
1301        except Exception as err:
1302            self.log.error(FAILURE.format(cmd, err))
1303
1304    """End GATT client wrappers"""
1305    """Begin LE scan wrappers"""
1306
1307    def _update_scan_results(self, scan_results):
1308        self.le_ids = []
1309        for scan in scan_results['result']:
1310            self.le_ids.append(scan['id'])
1311
1312    def do_ble_start_scan(self, line):
1313        """
1314        Description: Perform a BLE scan.
1315        Default filter name: ""
1316        Optional input: filter_device_name
1317        Usage:
1318          Examples:
1319            ble_start_scan
1320            ble_start_scan eddystone
1321        """
1322        cmd = "Perform a BLE scan and list discovered devices."
1323        try:
1324            scan_filter = {"name_substring": ""}
1325            if line:
1326                scan_filter = {"name_substring": line}
1327            self.pri_dut.sl4f.gattc_lib.bleStartBleScan(scan_filter)
1328        except Exception as err:
1329            self.log.error(FAILURE.format(cmd, err))
1330
1331    def do_ble_stop_scan(self, line):
1332        """
1333        Description: Stops a BLE scan and returns discovered devices.
1334        Usage:
1335          Examples:
1336            ble_stop_scan
1337        """
1338        cmd = "Stops a BLE scan and returns discovered devices."
1339        try:
1340            scan_results = self.pri_dut.sl4f.gattc_lib.bleStopBleScan()
1341            self._update_scan_results(scan_results)
1342            self.log.info(pprint.pformat(scan_results))
1343        except Exception as err:
1344            self.log.error(FAILURE.format(cmd, err))
1345
1346    def do_ble_get_discovered_devices(self, line):
1347        """
1348        Description: Get discovered LE devices of an active scan.
1349        Usage:
1350          Examples:
1351            ble_stop_scan
1352        """
1353        cmd = "Get discovered LE devices of an active scan."
1354        try:
1355            scan_results = self.pri_dut.sl4f.gattc_lib.bleGetDiscoveredDevices(
1356            )
1357            self._update_scan_results(scan_results)
1358            self.log.info(pprint.pformat(scan_results))
1359        except Exception as err:
1360            self.log.error(FAILURE.format(cmd, err))
1361
1362    """End LE scan wrappers"""
1363    """Begin GATT Server wrappers"""
1364
1365    def do_gatts_close(self, line):
1366        """
1367        Description: Close active GATT server.
1368
1369        Usage:
1370          Examples:
1371            gatts_close
1372        """
1373        cmd = "Close active GATT server."
1374        try:
1375            result = self.pri_dut.sl4f.gatts_lib.closeServer()
1376            self.log.info(result)
1377        except Exception as err:
1378            self.log.error(FAILURE.format(cmd, err))
1379
1380    def complete_gatts_setup_database(self, text, line, begidx, endidx):
1381        if not text:
1382            completions = list(
1383                gatt_test_database.GATT_SERVER_DB_MAPPING.keys())
1384        else:
1385            completions = [
1386                s for s in gatt_test_database.GATT_SERVER_DB_MAPPING.keys()
1387                if s.startswith(text)
1388            ]
1389        return completions
1390
1391    def do_gatts_setup_database(self, line):
1392        """
1393        Description: Setup a Gatt server database based on pre-defined inputs.
1394            Supports Tab Autocomplete.
1395        Input(s):
1396            descriptor_db_name: The descriptor db name that matches one in
1397                acts_contrib.test_utils.bt.gatt_test_database
1398        Usage:
1399          Examples:
1400            gatts_setup_database LARGE_DB_1
1401        """
1402        cmd = "Setup GATT Server Database Based of pre-defined dictionaries"
1403        try:
1404            scan_results = self.pri_dut.sl4f.gatts_lib.publishServer(
1405                gatt_test_database.GATT_SERVER_DB_MAPPING.get(line))
1406            self.log.info(scan_results)
1407        except Exception as err:
1408            self.log.error(FAILURE.format(cmd, err))
1409
1410    """End GATT Server wrappers"""
1411    """Begin Bluetooth Controller wrappers"""
1412
1413    def complete_btc_pair(self, text, line, begidx, endidx):
1414        """ Provides auto-complete for btc_pair cmd.
1415
1416        See Cmd module for full description.
1417        """
1418        arg_completion = len(line.split(" ")) - 1
1419        pairing_security_level_options = ['ENCRYPTED', 'AUTHENTICATED', 'NONE']
1420        bondable_options = ['BONDABLE', 'NON_BONDABLE', 'NONE']
1421        transport_options = ['BREDR', 'LE']
1422        if arg_completion == 1:
1423            if not text:
1424                completions = pairing_security_level_options
1425            else:
1426                completions = [
1427                    s for s in pairing_security_level_options
1428                    if s.startswith(text)
1429                ]
1430            return completions
1431        if arg_completion == 2:
1432            if not text:
1433                completions = bondable_options
1434            else:
1435                completions = [
1436                    s for s in bondable_options if s.startswith(text)
1437                ]
1438            return completions
1439        if arg_completion == 3:
1440            if not text:
1441                completions = transport_options
1442            else:
1443                completions = [
1444                    s for s in transport_options if s.startswith(text)
1445                ]
1446            return completions
1447
1448    def do_btc_pair(self, line):
1449        """
1450        Description: Sends an outgoing pairing request.
1451
1452        Input(s):
1453            pairing security level: ENCRYPTED, AUTHENTICATED, or NONE
1454            bondable: BONDABLE, NON_BONDABLE, or NONE
1455            transport: BREDR or LE
1456
1457        Usage:
1458          Examples:
1459            btc_pair NONE NONE BREDR
1460            btc_pair ENCRYPTED NONE LE
1461            btc_pair AUTHENTICATED NONE LE
1462            btc_pair NONE NON_BONDABLE BREDR
1463        """
1464        cmd = "Send an outgoing pairing request."
1465        pairing_security_level_mapping = {
1466            "ENCRYPTED": 1,
1467            "AUTHENTICATED": 2,
1468            "NONE": None,
1469        }
1470
1471        bondable_mapping = {
1472            "BONDABLE": True,
1473            "NON_BONDABLE": False,
1474            "NONE": None,
1475        }
1476
1477        transport_mapping = {
1478            "BREDR": 1,
1479            "LE": 2,
1480        }
1481
1482        try:
1483            options = line.split(" ")
1484            result = self.test_dut.init_pair(
1485                self.unique_mac_addr_id,
1486                pairing_security_level_mapping.get(options[0]),
1487                bondable_mapping.get(options[1]),
1488                transport_mapping.get(options[2]),
1489            )
1490            self.log.info(result)
1491        except Exception as err:
1492            self.log.error(FAILURE.format(cmd, err))
1493
1494    def complete_btc_accept_pairing(self, text, line, begidx, endidx):
1495        """ Provides auto-complete for btc_set_io_capabilities cmd.
1496
1497        See Cmd module for full description.
1498        """
1499        arg_completion = len(line.split(" ")) - 1
1500        input_options = ['NONE', 'CONFIRMATION', 'KEYBOARD']
1501        output_options = ['NONE', 'DISPLAY']
1502        if arg_completion == 1:
1503            if not text:
1504                completions = input_options
1505            else:
1506                completions = [s for s in input_options if s.startswith(text)]
1507            return completions
1508        if arg_completion == 2:
1509            if not text:
1510                completions = output_options
1511            else:
1512                completions = [s for s in output_options if s.startswith(text)]
1513            return completions
1514
1515    def do_btc_accept_pairing(self, line):
1516        """
1517        Description: Accept all incoming pairing requests.
1518
1519        Input(s):
1520            input: String - The input I/O capabilities to use
1521                Available Values:
1522                NONE - Input capability type None
1523                CONFIRMATION - Input capability type confirmation
1524                KEYBOARD - Input capability type Keyboard
1525            output: String - The output I/O Capabilities to use
1526                Available Values:
1527                NONE - Output capability type None
1528                DISPLAY - output capability type Display
1529
1530        Usage:
1531          Examples:
1532            btc_accept_pairing
1533            btc_accept_pairing NONE DISPLAY
1534            btc_accept_pairing NONE NONE
1535            btc_accept_pairing KEYBOARD DISPLAY
1536        """
1537        cmd = "Accept incoming pairing requests"
1538        try:
1539            input_capabilities = "NONE"
1540            output_capabilities = "NONE"
1541            options = line.split(" ")
1542            if len(options) > 1:
1543                input_capabilities = options[0]
1544                output_capabilities = options[1]
1545            result = self.pri_dut.sl4f.bts_lib.acceptPairing(
1546                input_capabilities, output_capabilities)
1547            self.log.info(result)
1548        except Exception as err:
1549            self.log.error(FAILURE.format(cmd, err))
1550
1551    def do_btc_forget_device(self, line):
1552        """
1553        Description: Forget pairing of the current device under test.
1554            Current device under test is the device found by
1555            tool_refresh_unique_id from custom user param. This function
1556            will also perform a clean disconnect if actively connected.
1557
1558        Usage:
1559          Examples:
1560            btc_forget_device
1561        """
1562        cmd = "For pairing of the current device under test."
1563        try:
1564            self.log.info("Forgetting device id: {}".format(
1565                self.unique_mac_addr_id))
1566            result = self.pri_dut.sl4f.bts_lib.forgetDevice(
1567                self.unique_mac_addr_id)
1568            self.log.info(result)
1569        except Exception as err:
1570            self.log.error(FAILURE.format(cmd, err))
1571
1572    def do_btc_set_discoverable(self, discoverable):
1573        """
1574        Description: Change Bluetooth Controller discoverablility.
1575        Input(s):
1576            discoverable: true to set discoverable
1577                          false to set non-discoverable
1578        Usage:
1579          Examples:
1580            btc_set_discoverable true
1581            btc_set_discoverable false
1582        """
1583        cmd = "Change Bluetooth Controller discoverablility."
1584        try:
1585            result = self.test_dut.set_discoverable(
1586                self.str_to_bool(discoverable))
1587            self.log.info(result)
1588        except Exception as err:
1589            self.log.error(FAILURE.format(cmd, err))
1590
1591    def do_btc_set_name(self, name):
1592        """
1593        Description: Change Bluetooth Controller local name.
1594        Input(s):
1595            name: The name to set the Bluetooth Controller name to.
1596
1597        Usage:
1598          Examples:
1599            btc_set_name fs_test
1600        """
1601        cmd = "Change Bluetooth Controller local name."
1602        try:
1603            result = self.test_dut.set_bluetooth_local_name(name)
1604            self.log.info(result)
1605        except Exception as err:
1606            self.log.error(FAILURE.format(cmd, err))
1607
1608    def do_btc_request_discovery(self, discover):
1609        """
1610        Description: Change whether the Bluetooth Controller is in active.
1611            discovery or not.
1612        Input(s):
1613            discover: true to start discovery
1614                      false to end discovery
1615        Usage:
1616          Examples:
1617            btc_request_discovery true
1618            btc_request_discovery false
1619        """
1620        cmd = "Change whether the Bluetooth Controller is in active."
1621        try:
1622            result = self.pri_dut.sl4f.bts_lib.requestDiscovery(
1623                self.str_to_bool(discover))
1624            self.log.info(result)
1625        except Exception as err:
1626            self.log.error(FAILURE.format(cmd, err))
1627
1628    def do_btc_get_known_remote_devices(self, line):
1629        """
1630        Description: Get a list of known devices.
1631
1632        Usage:
1633          Examples:
1634            btc_get_known_remote_devices
1635        """
1636        cmd = "Get a list of known devices."
1637        self.bt_control_devices = []
1638        try:
1639            device_list = self.pri_dut.sl4f.bts_lib.getKnownRemoteDevices(
1640            )['result']
1641            for id_dict in device_list:
1642                device = device_list[id_dict]
1643                self.bt_control_devices.append(device)
1644                self.log.info("Device found {}".format(device))
1645
1646        except Exception as err:
1647            self.log.error(FAILURE.format(cmd, err))
1648
1649    def do_btc_forget_all_known_devices(self, line):
1650        """
1651        Description: Forget all known devices.
1652
1653        Usage:
1654          Examples:
1655            btc_forget_all_known_devices
1656        """
1657        cmd = "Forget all known devices."
1658        try:
1659            device_list = self.pri_dut.sl4f.bts_lib.getKnownRemoteDevices(
1660            )['result']
1661            for device in device_list:
1662                d = device_list[device]
1663                if d['bonded'] or d['connected']:
1664                    self.log.info("Unbonding deivce: {}".format(d))
1665                    self.log.info(
1666                        self.pri_dut.sl4f.bts_lib.forgetDevice(
1667                            d['id'])['result'])
1668        except Exception as err:
1669            self.log.error(FAILURE.format(cmd, err))
1670
1671    def do_btc_connect_device(self, line):
1672        """
1673        Description: Connect to device under test.
1674            Device under test is specified by either user params
1675            or
1676                tool_set_target_device_name <name>
1677                do_tool_refresh_unique_id_using_bt_control
1678
1679        Usage:
1680          Examples:
1681            btc_connect_device
1682        """
1683        cmd = "Connect to device under test."
1684        try:
1685            result = self.pri_dut.sl4f.bts_lib.connectDevice(
1686                self.unique_mac_addr_id)
1687            self.log.info(result)
1688        except Exception as err:
1689            self.log.error(FAILURE.format(cmd, err))
1690
1691    def complete_btc_connect_device_by_id(self, text, line, begidx, endidx):
1692        if not text:
1693            completions = list(self.bt_control_ids)[:]
1694        else:
1695            completions = [
1696                s for s in self.bt_control_ids if s.startswith(text)
1697            ]
1698        return completions
1699
1700    def do_btc_connect_device_by_id(self, device_id):
1701        """
1702        Description: Connect to device id based on pre-defined inputs.
1703            Supports Tab Autocomplete.
1704        Input(s):
1705            device_id: The device id to connect to.
1706
1707        Usage:
1708          Examples:
1709            btc_connect_device_by_id <device_id>
1710        """
1711        cmd = "Connect to device id based on pre-defined inputs."
1712        try:
1713            result = self.pri_dut.sl4f.bts_lib.connectDevice(device_id)
1714            self.log.info(result)
1715        except Exception as err:
1716            self.log.error(FAILURE.format(cmd, err))
1717
1718    def complete_btc_connect_device_by_name(self, text, line, begidx, endidx):
1719        if not text:
1720            completions = list(self.bt_control_names)[:]
1721        else:
1722            completions = [
1723                s for s in self.bt_control_names if s.startswith(text)
1724            ]
1725        return completions
1726
1727    def do_btc_connect_device_by_name(self, device_name):
1728        """
1729        Description: Connect to device id based on pre-defined inputs.
1730            Supports Tab Autocomplete.
1731        Input(s):
1732            device_id: The device id to connect to.
1733
1734        Usage:
1735          Examples:
1736            btc_connect_device_by_name <device_id>
1737        """
1738        cmd = "Connect to device name based on pre-defined inputs."
1739        try:
1740            for device in self.bt_control_devices:
1741                if device_name is device['name']:
1742
1743                    result = self.pri_dut.sl4f.bts_lib.connectDevice(
1744                        device['id'])
1745                    self.log.info(result)
1746        except Exception as err:
1747            self.log.error(FAILURE.format(cmd, err))
1748
1749    def do_btc_disconnect_device(self, line):
1750        """
1751        Description: Disconnect to device under test.
1752            Device under test is specified by either user params
1753            or
1754                tool_set_target_device_name <name>
1755                do_tool_refresh_unique_id_using_bt_control
1756
1757        Usage:
1758          Examples:
1759            btc_disconnect_device
1760        """
1761        cmd = "Disconnect to device under test."
1762        try:
1763            result = self.pri_dut.sl4f.bts_lib.disconnectDevice(
1764                self.unique_mac_addr_id)
1765            self.log.info(result)
1766        except Exception as err:
1767            self.log.error(FAILURE.format(cmd, err))
1768
1769    def do_btc_init_bluetooth_control(self, line):
1770        """
1771        Description: Initialize the Bluetooth Controller.
1772
1773        Usage:
1774          Examples:
1775            btc_init_bluetooth_control
1776        """
1777        cmd = "Initialize the Bluetooth Controller."
1778        try:
1779            result = self.test_dut.initialize_bluetooth_controller()
1780            self.log.info(result)
1781        except Exception as err:
1782            self.log.error(FAILURE.format(cmd, err))
1783
1784    def do_btc_get_local_address(self, line):
1785        """
1786        Description: Get the local BR/EDR address of the Bluetooth Controller.
1787
1788        Usage:
1789          Examples:
1790            btc_get_local_address
1791        """
1792        cmd = "Get the local BR/EDR address of the Bluetooth Controller."
1793        try:
1794            result = self.test_dut.get_local_bluetooth_address()
1795            self.log.info(result)
1796        except Exception as err:
1797            self.log.error(FAILURE.format(cmd, err))
1798
1799    def do_btc_input_pairing_pin(self, line):
1800        """
1801        Description: Sends a pairing pin to SL4F's Bluetooth Control's
1802        Pairing Delegate.
1803
1804        Usage:
1805          Examples:
1806            btc_input_pairing_pin 123456
1807        """
1808        cmd = "Input pairing pin to the Fuchsia device."
1809        try:
1810            result = self.pri_dut.sl4f.bts_lib.inputPairingPin(line)['result']
1811            self.log.info(result)
1812        except Exception as err:
1813            self.log.error(FAILURE.format(cmd, err))
1814
1815    def do_btc_get_pairing_pin(self, line):
1816        """
1817        Description: Gets the pairing pin from SL4F's Bluetooth Control's
1818        Pairing Delegate.
1819
1820        Usage:
1821          Examples:
1822            btc_get_pairing_pin
1823        """
1824        cmd = "Get the pairing pin from the Fuchsia device."
1825        try:
1826            result = self.pri_dut.sl4f.bts_lib.getPairingPin()['result']
1827            self.log.info(result)
1828        except Exception as err:
1829            self.log.error(FAILURE.format(cmd, err))
1830
1831    """End Bluetooth Control wrappers"""
1832    """Begin Profile Server wrappers"""
1833
1834    def do_sdp_pts_example(self, num_of_records):
1835        """
1836        Description: An example of how to setup a generic SDP record
1837            and SDP search capabilities. This example will pass a few
1838            SDP tests.
1839
1840        Input(s):
1841            num_of_records: The number of records to add.
1842
1843        Usage:
1844          Examples:
1845            sdp_pts_example 1
1846            sdp pts_example 10
1847        """
1848        cmd = "Setup SDP for PTS testing."
1849
1850        attributes = [
1851            bt_attribute_values['ATTR_PROTOCOL_DESCRIPTOR_LIST'],
1852            bt_attribute_values['ATTR_SERVICE_CLASS_ID_LIST'],
1853            bt_attribute_values['ATTR_BLUETOOTH_PROFILE_DESCRIPTOR_LIST'],
1854            bt_attribute_values['ATTR_A2DP_SUPPORTED_FEATURES'],
1855        ]
1856
1857        try:
1858            self.pri_dut.sl4f.sdp_lib.addSearch(
1859                attributes, int(sig_uuid_constants['AudioSource'], 16))
1860            self.pri_dut.sl4f.sdp_lib.addSearch(
1861                attributes, int(sig_uuid_constants['A/V_RemoteControl'], 16))
1862            self.pri_dut.sl4f.sdp_lib.addSearch(
1863                attributes, int(sig_uuid_constants['PANU'], 16))
1864            self.pri_dut.sl4f.sdp_lib.addSearch(
1865                attributes, int(sig_uuid_constants['SerialPort'], 16))
1866            self.pri_dut.sl4f.sdp_lib.addSearch(
1867                attributes, int(sig_uuid_constants['DialupNetworking'], 16))
1868            self.pri_dut.sl4f.sdp_lib.addSearch(
1869                attributes, int(sig_uuid_constants['OBEXObjectPush'], 16))
1870            self.pri_dut.sl4f.sdp_lib.addSearch(
1871                attributes, int(sig_uuid_constants['OBEXFileTransfer'], 16))
1872            self.pri_dut.sl4f.sdp_lib.addSearch(
1873                attributes, int(sig_uuid_constants['Headset'], 16))
1874            self.pri_dut.sl4f.sdp_lib.addSearch(
1875                attributes, int(sig_uuid_constants['HandsfreeAudioGateway'],
1876                                16))
1877            self.pri_dut.sl4f.sdp_lib.addSearch(
1878                attributes, int(sig_uuid_constants['Handsfree'], 16))
1879            self.pri_dut.sl4f.sdp_lib.addSearch(
1880                attributes, int(sig_uuid_constants['SIM_Access'], 16))
1881            for i in range(int(num_of_records)):
1882                result = self.pri_dut.sl4f.sdp_lib.addService(
1883                    sdp_pts_record_list[i])
1884                self.log.info(result)
1885        except Exception as err:
1886            self.log.error(FAILURE.format(cmd, err))
1887
1888    def do_sdp_cleanup(self, line):
1889        """
1890        Description: Cleanup any existing SDP records
1891
1892        Usage:
1893          Examples:
1894            sdp_cleanup
1895        """
1896        cmd = "Cleanup SDP objects."
1897        try:
1898            result = self.pri_dut.sl4f.sdp_lib.cleanUp()
1899            self.log.info(result)
1900        except Exception as err:
1901            self.log.error(FAILURE.format(cmd, err))
1902
1903    def do_sdp_init(self, line):
1904        """
1905        Description: Init the profile proxy for setting up SDP records
1906
1907        Usage:
1908          Examples:
1909            sdp_init
1910        """
1911        cmd = "Initialize profile proxy objects for adding SDP records"
1912        try:
1913            result = self.pri_dut.sl4f.sdp_lib.init()
1914            self.log.info(result)
1915        except Exception as err:
1916            self.log.error(FAILURE.format(cmd, err))
1917
1918    def do_sdp_connect_l2cap(self, line):
1919        """
1920        Description: Send an l2cap connection request over an input psm value.
1921
1922        Note: Must be already connected to a peer.
1923
1924        Input(s):
1925            psm: The int hex value to connect over. Available PSMs:
1926                SDP 0x0001  See Bluetooth Service Discovery Protocol (SDP)
1927                RFCOMM  0x0003  See RFCOMM with TS 07.10
1928                TCS-BIN 0x0005  See Bluetooth Telephony Control Specification /
1929                    TCS Binary
1930                TCS-BIN-CORDLESS    0x0007  See Bluetooth Telephony Control
1931                    Specification / TCS Binary
1932                BNEP    0x000F  See Bluetooth Network Encapsulation Protocol
1933                HID_Control 0x0011  See Human Interface Device
1934                HID_Interrupt   0x0013  See Human Interface Device
1935                UPnP    0x0015  See [ESDP]
1936                AVCTP   0x0017  See Audio/Video Control Transport Protocol
1937                AVDTP   0x0019  See Audio/Video Distribution Transport Protocol
1938                AVCTP_Browsing  0x001B  See Audio/Video Remote Control Profile
1939                UDI_C-Plane 0x001D  See the Unrestricted Digital Information
1940                    Profile [UDI]
1941                ATT 0x001F  See Bluetooth Core Specification​
1942                ​3DSP   0x0021​ ​​See 3D Synchronization Profile.
1943                ​LE_PSM_IPSP    ​0x0023 ​See Internet Protocol Support Profile
1944                    (IPSP)
1945                OTS 0x0025  See Object Transfer Service (OTS)
1946                EATT    0x0027  See Bluetooth Core Specification
1947            mode: String - The channel mode to connect to. Available values:
1948                Basic mode: BASIC
1949                Enhanced Retransmission mode: ERTM
1950
1951        Usage:
1952          Examples:
1953            sdp_connect_l2cap 0001 BASIC
1954            sdp_connect_l2cap 0019 ERTM
1955        """
1956        cmd = "Connect l2cap"
1957        try:
1958            info = line.split()
1959            result = self.pri_dut.sl4f.sdp_lib.connectL2cap(
1960                self.unique_mac_addr_id, int(info[0], 16), info[1])
1961            self.log.info(result)
1962        except Exception as err:
1963            self.log.error(FAILURE.format(cmd, err))
1964
1965    """End Profile Server wrappers"""
1966    """Begin AVDTP wrappers"""
1967
1968    def do_avdtp_init(self, initiator_delay):
1969        """
1970        Description: Init the A2DP component start and AVDTP service to
1971            initiate.
1972
1973        Input(s):
1974            initiator_delay: [Optional] The stream initiator delay to set in
1975            milliseconds.
1976
1977        Usage:
1978          Examples:
1979            avdtp_init 0
1980            avdtp_init 2000
1981            avdtp_init
1982        """
1983        cmd = "Initialize AVDTP proxy"
1984        try:
1985            if not initiator_delay:
1986                initiator_delay = None
1987            result = self.pri_dut.sl4f.avdtp_lib.init(initiator_delay)
1988            self.log.info(result)
1989        except Exception as err:
1990            self.log.error(FAILURE.format(cmd, err))
1991
1992    def do_avdtp_kill_a2dp(self, line):
1993        """
1994        Description: Quickly kill any A2DP components.
1995
1996        Usage:
1997          Examples:
1998            avdtp_kill_a2dp
1999        """
2000        cmd = "Kill A2DP service"
2001        try:
2002            self.pri_dut.start_v1_component("bt-a2dp")
2003        except Exception as err:
2004            self.log.error(FAILURE.format(cmd, err))
2005
2006    def do_avdtp_get_connected_peers(self, line):
2007        """
2008        Description: Get the connected peers for the AVDTP service
2009
2010        Usage:
2011          Examples:
2012            avdtp_get_connected_peers
2013        """
2014        cmd = "AVDTP get connected peers"
2015        try:
2016            result = self.pri_dut.sl4f.avdtp_lib.getConnectedPeers()
2017            self.log.info(result)
2018        except Exception as err:
2019            self.log.error(FAILURE.format(cmd, err))
2020
2021    def do_avdtp_set_configuration(self, peer_id):
2022        """
2023        Description: Send AVDTP command to connected peer: set configuration
2024
2025        Input(s):
2026            peer_id: The specified peer_id.
2027
2028        Usage:
2029          Examples:
2030            avdtp_set_configuration <peer_id>
2031        """
2032        cmd = "Send AVDTP set configuration to connected peer"
2033        try:
2034            result = self.pri_dut.sl4f.avdtp_lib.setConfiguration(int(peer_id))
2035            self.log.info(result)
2036        except Exception as err:
2037            self.log.error(FAILURE.format(cmd, err))
2038
2039    def do_avdtp_get_configuration(self, peer_id):
2040        """
2041        Description: Send AVDTP command to connected peer: get configuration
2042
2043        Input(s):
2044            peer_id: The specified peer_id.
2045
2046        Usage:
2047          Examples:
2048            avdtp_get_configuration <peer_id>
2049        """
2050        cmd = "Send AVDTP get configuration to connected peer"
2051        try:
2052            result = self.pri_dut.sl4f.avdtp_lib.getConfiguration(int(peer_id))
2053            self.log.info(result)
2054        except Exception as err:
2055            self.log.error(FAILURE.format(cmd, err))
2056
2057    def do_avdtp_get_capabilities(self, peer_id):
2058        """
2059        Description: Send AVDTP command to connected peer: get capabilities
2060
2061        Input(s):
2062            peer_id: The specified peer_id.
2063
2064        Usage:
2065          Examples:
2066            avdtp_get_capabilities <peer_id>
2067        """
2068        cmd = "Send AVDTP get capabilities to connected peer"
2069        try:
2070            result = self.pri_dut.sl4f.avdtp_lib.getCapabilities(int(peer_id))
2071            self.log.info(result)
2072        except Exception as err:
2073            self.log.error(FAILURE.format(cmd, err))
2074
2075    def do_avdtp_get_all_capabilities(self, peer_id):
2076        """
2077        Description: Send AVDTP command to connected peer: get all capabilities
2078
2079        Input(s):
2080            peer_id: The specified peer_id.
2081
2082        Usage:
2083          Examples:
2084            avdtp_get_all_capabilities <peer_id>
2085        """
2086        cmd = "Send AVDTP get all capabilities to connected peer"
2087        try:
2088            result = self.pri_dut.sl4f.avdtp_lib.getAllCapabilities(
2089                int(peer_id))
2090            self.log.info(result)
2091        except Exception as err:
2092            self.log.error(FAILURE.format(cmd, err))
2093
2094    def do_avdtp_reconfigure_stream(self, peer_id):
2095        """
2096        Description: Send AVDTP command to connected peer: reconfigure stream
2097
2098        Input(s):
2099            peer_id: The specified peer_id.
2100
2101        Usage:
2102          Examples:
2103            avdtp_reconfigure_stream <peer_id>
2104        """
2105        cmd = "Send AVDTP reconfigure stream to connected peer"
2106        try:
2107            result = self.pri_dut.sl4f.avdtp_lib.reconfigureStream(
2108                int(peer_id))
2109            self.log.info(result)
2110        except Exception as err:
2111            self.log.error(FAILURE.format(cmd, err))
2112
2113    def do_avdtp_suspend_stream(self, peer_id):
2114        """
2115        Description: Send AVDTP command to connected peer: suspend stream
2116
2117        Input(s):
2118            peer_id: The specified peer_id.
2119
2120        Usage:
2121          Examples:
2122            avdtp_suspend_stream <peer_id>
2123        """
2124        cmd = "Send AVDTP suspend stream to connected peer"
2125        try:
2126            result = self.pri_dut.sl4f.avdtp_lib.suspendStream(int(peer_id))
2127            self.log.info(result)
2128        except Exception as err:
2129            self.log.error(FAILURE.format(cmd, err))
2130
2131    def do_avdtp_suspend_reconfigure(self, peer_id):
2132        """
2133        Description: Send AVDTP command to connected peer: suspend reconfigure
2134
2135        Input(s):
2136            peer_id: The specified peer_id.
2137
2138        Usage:
2139          Examples:
2140            avdtp_suspend_reconfigure <peer_id>
2141        """
2142        cmd = "Send AVDTP suspend reconfigure to connected peer"
2143        try:
2144            result = self.pri_dut.sl4f.avdtp_lib.suspendAndReconfigure(
2145                int(peer_id))
2146            self.log.info(result)
2147        except Exception as err:
2148            self.log.error(FAILURE.format(cmd, err))
2149
2150    def do_avdtp_release_stream(self, peer_id):
2151        """
2152        Description: Send AVDTP command to connected peer: release stream
2153
2154        Input(s):
2155            peer_id: The specified peer_id.
2156
2157        Usage:
2158          Examples:
2159            avdtp_release_stream <peer_id>
2160        """
2161        cmd = "Send AVDTP release stream to connected peer"
2162        try:
2163            result = self.pri_dut.sl4f.avdtp_lib.releaseStream(int(peer_id))
2164            self.log.info(result)
2165        except Exception as err:
2166            self.log.error(FAILURE.format(cmd, err))
2167
2168    def do_avdtp_establish_stream(self, peer_id):
2169        """
2170        Description: Send AVDTP command to connected peer: establish stream
2171
2172        Input(s):
2173            peer_id: The specified peer_id.
2174
2175        Usage:
2176          Examples:
2177            avdtp_establish_stream <peer_id>
2178        """
2179        cmd = "Send AVDTP establish stream to connected peer"
2180        try:
2181            result = self.pri_dut.sl4f.avdtp_lib.establishStream(int(peer_id))
2182            self.log.info(result)
2183        except Exception as err:
2184            self.log.error(FAILURE.format(cmd, err))
2185
2186    def do_avdtp_start_stream(self, peer_id):
2187        """
2188        Description: Send AVDTP command to connected peer: start stream
2189
2190        Input(s):
2191            peer_id: The specified peer_id.
2192
2193        Usage:
2194          Examples:
2195            avdtp_start_stream <peer_id>
2196        """
2197        cmd = "Send AVDTP start stream to connected peer"
2198        try:
2199            result = self.pri_dut.sl4f.avdtp_lib.startStream(int(peer_id))
2200            self.log.info(result)
2201        except Exception as err:
2202            self.log.error(FAILURE.format(cmd, err))
2203
2204    def do_avdtp_abort_stream(self, peer_id):
2205        """
2206        Description: Send AVDTP command to connected peer: abort stream
2207
2208        Input(s):
2209            peer_id: The specified peer_id.
2210
2211        Usage:
2212          Examples:
2213            avdtp_abort_stream <peer_id>
2214        """
2215        cmd = "Send AVDTP abort stream to connected peer"
2216        try:
2217            result = self.pri_dut.sl4f.avdtp_lib.abortStream(int(peer_id))
2218            self.log.info(result)
2219        except Exception as err:
2220            self.log.error(FAILURE.format(cmd, err))
2221
2222    def do_avdtp_remove_service(self, line):
2223        """
2224        Description: Removes the AVDTP service in use.
2225
2226        Usage:
2227          Examples:
2228            avdtp_establish_stream <peer_id>
2229        """
2230        cmd = "Remove AVDTP service"
2231        try:
2232            result = self.pri_dut.sl4f.avdtp_lib.removeService()
2233            self.log.info(result)
2234        except Exception as err:
2235            self.log.error(FAILURE.format(cmd, err))
2236
2237    """End AVDTP wrappers"""
2238    """Begin Audio wrappers"""
2239
2240    def do_audio_start_output_save(self, line):
2241        """
2242        Description: Start audio output save
2243
2244        Usage:
2245          Examples:
2246            audio_start_output_save
2247        """
2248        cmd = "Start audio capture"
2249        try:
2250            result = self.pri_dut.sl4f.audio_lib.startOutputSave()
2251            self.log.info(result)
2252        except Exception as err:
2253            self.log.error(FAILURE.format(cmd, err))
2254
2255    def do_audio_stop_output_save(self, line):
2256        """
2257        Description: Stop audio output save
2258
2259        Usage:
2260          Examples:
2261            audio_stop_output_save
2262        """
2263        cmd = "Stop audio capture"
2264        try:
2265            result = self.pri_dut.sl4f.audio_lib.stopOutputSave()
2266            self.log.info(result)
2267        except Exception as err:
2268            self.log.error(FAILURE.format(cmd, err))
2269
2270    def do_audio_get_output_audio(self, line):
2271        """
2272        Description: Get the audio output saved to a local file
2273
2274        Usage:
2275          Examples:
2276            audio_get_output_audio
2277        """
2278        cmd = "Get audio capture"
2279        try:
2280            save_path = "{}/{}".format(self.pri_dut.log_path, "audio.raw")
2281            result = self.pri_dut.sl4f.audio_lib.getOutputAudio(save_path)
2282        except Exception as err:
2283            self.log.error(FAILURE.format(cmd, err))
2284
2285    def do_audio_5_min_test(self, line):
2286        """
2287        Description: Capture and anlyize sine audio waves played from a Bluetooth A2DP
2288        Source device.
2289
2290        Pre steps:
2291        1. Pair A2DP source device
2292        2. Prepare generated SOX file over preferred codec on source device.
2293            Quick way to generate necessary audio files:
2294            sudo apt-get install sox
2295            sox -b 16 -r 48000 -c 2 -n audio_file_2k1k_5_min.wav synth 300 sine 2000 sine 3000
2296
2297        Usage:
2298          Examples:
2299            audio_5_min_test
2300        """
2301        cmd = "5 min audio capture test"
2302        input("Press Enter once Source device is streaming audio file")
2303        try:
2304            result = self.pri_dut.sl4f.audio_lib.startOutputSave()
2305            self.log.info(result)
2306            for i in range(5):
2307                print("Minutes left: {}".format(10 - i))
2308                time.sleep(60)
2309            result = self.pri_dut.sl4f.audio_lib.stopOutputSave()
2310            log_time = int(time.time())
2311            save_path = "{}/{}".format(self.pri_dut.log_path,
2312                                       "{}_audio.raw".format(log_time))
2313            analysis_path = "{}/{}".format(
2314                self.pri_dut.log_path,
2315                "{}_audio_analysis.txt".format(log_time))
2316            result = self.pri_dut.sl4f.audio_lib.getOutputAudio(save_path)
2317
2318            channels = 1
2319            try:
2320                quality_analysis(filename=save_path,
2321                                 output_file=analysis_path,
2322                                 bit_width=audio_bits_per_sample_32,
2323                                 rate=audio_sample_rate_48000,
2324                                 channel=channels,
2325                                 spectral_only=False)
2326
2327            except Exception as err:
2328                self.log.error("Failed to analyze raw audio: {}".format(err))
2329                return False
2330
2331            self.log.info("Analysis output here: {}".format(analysis_path))
2332            self.log.info("Analysis Results: {}".format(
2333                open(analysis_path).readlines()))
2334        except Exception as err:
2335            self.log.error(FAILURE.format(cmd, err))
2336
2337    """End Audio wrappers"""
2338    """Begin HFP wrappers"""
2339
2340    def do_hfp_init(self, line):
2341        """
2342        Description: Init the HFP component initiate.
2343
2344        Usage:
2345          Examples:
2346            hfp_init
2347        """
2348        cmd = "Initialize HFP proxy"
2349        try:
2350            result = self.pri_dut.sl4f.hfp_lib.init()
2351            self.log.info(result)
2352        except Exception as err:
2353            self.log.error(FAILURE.format(cmd, err))
2354
2355    def do_hfp_remove_service(self, line):
2356        """
2357        Description: Removes the HFP service in use.
2358
2359        Usage:
2360          Examples:
2361            hfp_remove_service
2362        """
2363        cmd = "Remove HFP service"
2364        try:
2365            result = self.pri_dut.sl4f.hfp_lib.removeService()
2366            self.log.info(result)
2367        except Exception as err:
2368            self.log.error(FAILURE.format(cmd, err))
2369
2370    def do_hfp_list_peers(self, line):
2371        """
2372        Description: List all HFP Hands-Free peers connected to the DUT.
2373
2374        Input(s):
2375
2376        Usage:
2377          Examples:
2378            hfp_list_peers
2379        """
2380        cmd = "Lists connected peers"
2381        try:
2382            result = self.pri_dut.sl4f.hfp_lib.listPeers()
2383            self.log.info(pprint.pformat(result))
2384        except Exception as err:
2385            self.log.error(FAILURE.format(cmd, err))
2386
2387    def do_hfp_set_active_peer(self, line):
2388        """
2389        Description: Set the active HFP Hands-Free peer for the DUT.
2390
2391        Input(s):
2392            peer_id: The id of the peer to be set active.
2393
2394        Usage:
2395          Examples:
2396            hfp_set_active_peer <peer_id>
2397        """
2398        cmd = "Set the active peer"
2399        try:
2400            peer_id = int(line.strip())
2401            result = self.pri_dut.sl4f.hfp_lib.setActivePeer(peer_id)
2402            self.log.info(result)
2403        except Exception as err:
2404            self.log.error(FAILURE.format(cmd, err))
2405
2406    def do_hfp_list_calls(self, line):
2407        """
2408        Description: List all calls known to the sl4f component on the DUT.
2409
2410        Input(s):
2411
2412        Usage:
2413          Examples:
2414            hfp_list_calls
2415        """
2416        cmd = "Lists all calls"
2417        try:
2418            result = self.pri_dut.sl4f.hfp_lib.listCalls()
2419            self.log.info(pprint.pformat(result))
2420        except Exception as err:
2421            self.log.error(FAILURE.format(cmd, err))
2422
2423    def do_hfp_new_call(self, line):
2424        """
2425        Description: Simulate a call on the call manager
2426
2427        Input(s):
2428            remote: The number of the remote party on the simulated call
2429            state: The state of the call. Must be one of "ringing", "waiting",
2430                   "dialing", "alerting", "active", "held".
2431            direction: The direction of the call. Must be one of "incoming", "outgoing".
2432
2433        Usage:
2434          Examples:
2435            hfp_new_call <remote> <state> <direction>
2436            hfp_new_call 14085555555 active incoming
2437            hfp_new_call 14085555555 held outgoing
2438            hfp_new_call 14085555555 ringing incoming
2439            hfp_new_call 14085555555 waiting incoming
2440            hfp_new_call 14085555555 alerting outgoing
2441            hfp_new_call 14085555555 dialing outgoing
2442        """
2443        cmd = "Simulates a call"
2444        try:
2445            info = line.strip().split()
2446            if len(info) != 3:
2447                raise ValueError(
2448                    "Exactly three command line arguments required: <remote> <state> <direction>"
2449                )
2450            remote, state, direction = info[0], info[1], info[2]
2451            result = self.pri_dut.sl4f.hfp_lib.newCall(remote, state,
2452                                                       direction)
2453            self.log.info(result)
2454        except Exception as err:
2455            self.log.error(FAILURE.format(cmd, err))
2456
2457    def do_hfp_incoming_call(self, line):
2458        """
2459        Description: Simulate an incoming call on the call manager
2460
2461        Input(s):
2462            remote: The number of the remote party on the incoming call
2463
2464        Usage:
2465          Examples:
2466            hfp_incoming_call <remote>
2467            hfp_incoming_call 14085555555
2468        """
2469        cmd = "Simulates an incoming call"
2470        try:
2471            remote = line.strip()
2472            result = self.pri_dut.sl4f.hfp_lib.initiateIncomingCall(remote)
2473            self.log.info(result)
2474        except Exception as err:
2475            self.log.error(FAILURE.format(cmd, err))
2476
2477    def do_hfp_waiting_call(self, line):
2478        """
2479        Description: Simulate an incoming call on the call manager when there is
2480        an onging active call already.
2481
2482        Input(s):
2483            remote: The number of the remote party on the incoming call
2484
2485        Usage:
2486          Examples:
2487            hfp_waiting_call <remote>
2488            hfp_waiting_call 14085555555
2489        """
2490        cmd = "Simulates an incoming call"
2491        try:
2492            remote = line.strip()
2493            result = self.pri_dut.sl4f.hfp_lib.initiateIncomingWaitingCall(
2494                remote)
2495            self.log.info(result)
2496        except Exception as err:
2497            self.log.error(FAILURE.format(cmd, err))
2498
2499    def do_hfp_outgoing_call(self, line):
2500        """
2501        Description: Simulate an outgoing call on the call manager
2502
2503        Input(s):
2504            remote: The number of the remote party on the outgoing call
2505
2506        Usage:
2507          Examples:
2508            hfp_outgoing_call <remote>
2509        """
2510        cmd = "Simulates an outgoing call"
2511        try:
2512            remote = line.strip()
2513            result = self.pri_dut.sl4f.hfp_lib.initiateOutgoingCall(remote)
2514            self.log.info(result)
2515        except Exception as err:
2516            self.log.error(FAILURE.format(cmd, err))
2517
2518    def do_hfp_set_call_active(self, line):
2519        """
2520        Description: Set the specified call to the "OngoingActive" state.
2521
2522        Input(s):
2523            call_id: The unique id of the call.
2524
2525        Usage:
2526          Examples:
2527            hfp_outgoing_call <call_id>
2528        """
2529        cmd = "Set the specified call to active"
2530        try:
2531            call_id = int(line.strip())
2532            result = self.pri_dut.sl4f.hfp_lib.setCallActive(call_id)
2533            self.log.info(result)
2534        except Exception as err:
2535            self.log.error(FAILURE.format(cmd, err))
2536
2537    def do_hfp_set_call_held(self, line):
2538        """
2539        Description: Set the specified call to the "OngoingHeld" state.
2540
2541        Input(s):
2542            call_id: The unique id of the call.
2543
2544        Usage:
2545          Examples:
2546            hfp_outgoing_call <call_id>
2547        """
2548        cmd = "Set the specified call to held"
2549        try:
2550            call_id = int(line.strip())
2551            result = self.pri_dut.sl4f.hfp_lib.setCallHeld(call_id)
2552            self.log.info(result)
2553        except Exception as err:
2554            self.log.error(FAILURE.format(cmd, err))
2555
2556    def do_hfp_set_call_terminated(self, line):
2557        """
2558        Description: Set the specified call to the "Terminated" state.
2559
2560        Input(s):
2561            call_id: The unique id of the call.
2562
2563        Usage:
2564          Examples:
2565            hfp_outgoing_call <call_id>
2566        """
2567        cmd = "Set the specified call to terminated"
2568        try:
2569            call_id = int(line.strip())
2570            result = self.pri_dut.sl4f.hfp_lib.setCallTerminated(call_id)
2571            self.log.info(result)
2572        except Exception as err:
2573            self.log.error(FAILURE.format(cmd, err))
2574
2575    def do_hfp_set_call_transferred_to_ag(self, line):
2576        """
2577        Description: Set the specified call to the "TransferredToAg" state.
2578
2579        Input(s):
2580            call_id: The unique id of the call.
2581
2582        Usage:
2583          Examples:
2584            hfp_outgoing_call <call_id>
2585        """
2586        cmd = "Set the specified call to TransferredToAg"
2587        try:
2588            call_id = int(line.strip())
2589            result = self.pri_dut.sl4f.hfp_lib.setCallTransferredToAg(call_id)
2590            self.log.info(result)
2591        except Exception as err:
2592            self.log.error(FAILURE.format(cmd, err))
2593
2594    def do_hfp_set_speaker_gain(self, line):
2595        """
2596        Description: Set the active peer's speaker gain.
2597
2598        Input(s):
2599            value: The gain value to set. Must be between 0-15 inclusive.
2600
2601        Usage:
2602          Examples:
2603            hfp_set_speaker_gain <value>
2604        """
2605        cmd = "Set the active peer's speaker gain"
2606        try:
2607            value = int(line.strip())
2608            result = self.pri_dut.sl4f.hfp_lib.setSpeakerGain(value)
2609            self.log.info(result)
2610        except Exception as err:
2611            self.log.error(FAILURE.format(cmd, err))
2612
2613    def do_hfp_set_microphone_gain(self, line):
2614        """
2615        Description: Set the active peer's microphone gain.
2616
2617        Input(s):
2618            value: The gain value to set. Must be between 0-15 inclusive.
2619
2620        Usage:
2621          Examples:
2622            hfp_set_microphone_gain <value>
2623        """
2624        cmd = "Set the active peer's microphone gain"
2625        try:
2626            value = int(line.strip())
2627            result = self.pri_dut.sl4f.hfp_lib.setMicrophoneGain(value)
2628            self.log.info(result)
2629        except Exception as err:
2630            self.log.error(FAILURE.format(cmd, err))
2631
2632    def do_hfp_set_service_available(self, line):
2633        """
2634        Description: Sets the simulated network service status reported by the call manager.
2635
2636        Input(s):
2637            value: "true" to set the network connection to available.
2638
2639        Usage:
2640          Examples:
2641            hfp_set_service_available <value>
2642            hfp_set_service_available true
2643            hfp_set_service_available false
2644        """
2645        cmd = "Sets the simulated network service status reported by the call manager"
2646        try:
2647            value = line.strip() == "true"
2648            result = self.pri_dut.sl4f.hfp_lib.setServiceAvailable(value)
2649            self.log.info(result)
2650        except Exception as err:
2651            self.log.error(FAILURE.format(cmd, err))
2652
2653    def do_hfp_set_roaming(self, line):
2654        """
2655        Description: Sets the simulated roaming status reported by the call manager.
2656
2657        Input(s):
2658            value: "true" to set the network connection to roaming.
2659
2660        Usage:
2661          Examples:
2662            hfp_set_roaming <value>
2663            hfp_set_roaming true
2664            hfp_set_roaming false
2665        """
2666        cmd = "Sets the simulated roaming status reported by the call manager"
2667        try:
2668            value = line.strip() == "true"
2669            result = self.pri_dut.sl4f.hfp_lib.setRoaming(value)
2670            self.log.info(result)
2671        except Exception as err:
2672            self.log.error(FAILURE.format(cmd, err))
2673
2674    def do_hfp_set_signal_strength(self, line):
2675        """
2676        Description: Sets the simulated signal strength reported by the call manager.
2677
2678        Input(s):
2679            value: The signal strength value to set. Must be between 0-5 inclusive.
2680
2681        Usage:
2682          Examples:
2683            hfp_set_signal_strength <value>
2684            hfp_set_signal_strength 0
2685            hfp_set_signal_strength 3
2686            hfp_set_signal_strength 5
2687        """
2688        cmd = "Sets the simulated signal strength reported by the call manager"
2689        try:
2690            value = int(line.strip())
2691            result = self.pri_dut.sl4f.hfp_lib.setSignalStrength(value)
2692            self.log.info(result)
2693        except Exception as err:
2694            self.log.error(FAILURE.format(cmd, err))
2695
2696    def do_hfp_set_subscriber_number(self, line):
2697        """
2698        Description: Sets the subscriber number reported by the call manager.
2699
2700        Input(s):
2701            value: The subscriber number to set. Maximum length 128 characters.
2702
2703        Usage:
2704          Examples:
2705            hfp_set_subscriber_number <value>
2706            hfp_set_subscriber_number 14085555555
2707        """
2708        cmd = "Sets the subscriber number reported by the call manager"
2709        try:
2710            value = line.strip()
2711            result = self.pri_dut.sl4f.hfp_lib.setSubscriberNumber(value)
2712            self.log.info(result)
2713        except Exception as err:
2714            self.log.error(FAILURE.format(cmd, err))
2715
2716    def do_hfp_set_operator(self, line):
2717        """
2718        Description: Sets the operator value reported by the call manager.
2719
2720        Input(s):
2721            value: The operator value to set. Maximum length 16 characters.
2722
2723        Usage:
2724          Examples:
2725            hfp_set_operator <value>
2726            hfp_set_operator GoogleFi
2727        """
2728        cmd = "Sets the operator value reported by the call manager"
2729        try:
2730            value = line.strip()
2731            result = self.pri_dut.sl4f.hfp_lib.setOperator(value)
2732            self.log.info(result)
2733        except Exception as err:
2734            self.log.error(FAILURE.format(cmd, err))
2735
2736    def do_hfp_set_nrec_support(self, line):
2737        """
2738        Description: Sets the noise reduction/echo cancelation support reported by the call manager.
2739
2740        Input(s):
2741            value: The nrec support bool.
2742
2743        Usage:
2744          Examples:
2745            hfp_set_nrec_support <value>
2746            hfp_set_nrec_support true
2747            hfp_set_nrec_support false
2748        """
2749        cmd = "Sets the noise reduction/echo cancelation support reported by the call manager"
2750        try:
2751            value = line.strip() == "true"
2752            result = self.pri_dut.sl4f.hfp_lib.setNrecSupport(value)
2753            self.log.info(result)
2754        except Exception as err:
2755            self.log.error(FAILURE.format(cmd, err))
2756
2757    def do_hfp_set_battery_level(self, line):
2758        """
2759        Description: Sets the battery level reported by the call manager.
2760
2761        Input(s):
2762            value: The integer battery level value. Must be 0-5 inclusive.
2763
2764        Usage:
2765          Examples:
2766            hfp_set_battery_level <value>
2767            hfp_set_battery_level 0
2768            hfp_set_battery_level 3
2769        """
2770        cmd = "Set the battery level reported by the call manager"
2771        try:
2772            value = int(line.strip())
2773            result = self.pri_dut.sl4f.hfp_lib.setBatteryLevel(value)
2774            self.log.info(result)
2775        except Exception as err:
2776            self.log.error(FAILURE.format(cmd, err))
2777
2778    def do_hfp_set_last_dialed(self, line):
2779        """
2780        Description: Sets the last dialed number in the call manager.
2781
2782        Input(s):
2783            number: The number of the remote party.
2784
2785        Usage:
2786          Examples:
2787            hfp_set_last_dialed <number>
2788            hfp_set_last_dialed 14085555555
2789        """
2790        cmd = "Sets the last dialed number in the call manager."
2791        try:
2792            number = line.strip()
2793            result = self.pri_dut.sl4f.hfp_lib.setLastDialed(number)
2794            self.log.info(result)
2795        except Exception as err:
2796            self.log.error(FAILURE.format(cmd, err))
2797
2798    def do_hfp_clear_last_dialed(self, line):
2799        """
2800        Description: Clears the last dialed number in the call manager.
2801
2802        Usage:
2803          Examples:
2804            hfp_clear_last_dialed
2805        """
2806        cmd = "Clears the last dialed number in the call manager."
2807        try:
2808            result = self.pri_dut.sl4f.hfp_lib.clearLastDialed()
2809            self.log.info(result)
2810        except Exception as err:
2811            self.log.error(FAILURE.format(cmd, err))
2812
2813    def do_hfp_set_memory_location(self, line):
2814        """
2815        Description: Sets a memory location to point to a remote number.
2816
2817        Input(s):
2818            location: The memory location at which to store the number.
2819            number: The number of the remote party to be stored.
2820
2821        Usage:
2822          Examples:
2823            hfp_set_memory_location <location> <number>
2824            hfp_set_memory_location 0 14085555555
2825        """
2826        cmd = "Sets a memory location to point to a remote number."
2827        try:
2828            info = line.strip().split()
2829            if len(info) != 2:
2830                raise ValueError(
2831                    "Exactly two command line arguments required: <location> <number>"
2832                )
2833            location, number = info[0], info[1]
2834            result = self.pri_dut.sl4f.hfp_lib.setMemoryLocation(
2835                location, number)
2836            self.log.info(result)
2837        except Exception as err:
2838            self.log.error(FAILURE.format(cmd, err))
2839
2840    def do_hfp_clear_memory_location(self, line):
2841        """
2842        Description: Sets a memory location to point to a remote number.
2843
2844        Input(s):
2845            localtion: The memory location to clear.
2846
2847        Usage:
2848          Examples:
2849            hfp_clear_memory_location <location>
2850            hfp_clear_memory_location 0
2851        """
2852        cmd = "Sets a memory location to point to a remote number."
2853        try:
2854            location = line.strip()
2855            result = self.pri_dut.sl4f.hfp_lib.clearMemoryLocation(location)
2856            self.log.info(result)
2857        except Exception as err:
2858            self.log.error(FAILURE.format(cmd, err))
2859
2860    def do_hfp_set_dial_result(self, line):
2861        """
2862        Description: Sets the status result to be returned when the number is dialed.
2863
2864        Input(s):
2865            number: The number of the remote party.
2866            status: The status to be returned when an outgoing call is initiated to the number.
2867
2868        Usage:
2869          Examples:
2870            hfp_set_battery_level <value>
2871        """
2872        cmd = "Sets the status result to be returned when the number is dialed."
2873        try:
2874            info = line.strip().split()
2875            if len(info) != 2:
2876                raise ValueError(
2877                    "Exactly two command line arguments required: <number> <status>"
2878                )
2879            number, status = info[0], int(info[1])
2880            result = self.pri_dut.sl4f.hfp_lib.setDialResult(number, status)
2881            self.log.info(pprint.pformat(result))
2882        except Exception as err:
2883            self.log.error(FAILURE.format(cmd, err))
2884
2885    def do_hfp_get_state(self, line):
2886        """
2887        Description: Get the call manager's complete state
2888
2889        Usage:
2890          Examples:
2891            hfp_get_state
2892        """
2893        cmd = "Get the call manager's state"
2894        try:
2895            result = self.pri_dut.sl4f.hfp_lib.getState()
2896            self.log.info(pprint.pformat(result))
2897        except Exception as err:
2898            self.log.error(FAILURE.format(cmd, err))
2899
2900    def do_hfp_set_connection_behavior(self, line):
2901        """
2902        Description: Set the Service Level Connection (SLC) behavior when a new peer connects.
2903
2904        Input(s):
2905            autoconnect: Enable/Disable autoconnection of SLC.
2906
2907        Usage:
2908          Examples:
2909            hfp_set_connection_behavior <autoconnect>
2910            hfp_set_connection_behavior true
2911            hfp_set_connection_behavior false
2912        """
2913        cmd = "Set the Service Level Connection (SLC) behavior"
2914        try:
2915            autoconnect = line.strip().lower() == "true"
2916            result = self.pri_dut.sl4f.hfp_lib.setConnectionBehavior(
2917                autoconnect)
2918            self.log.info(result)
2919        except Exception as err:
2920            self.log.error(FAILURE.format(cmd, err))
2921
2922    """End HFP wrappers"""
2923    """Begin RFCOMM wrappers"""
2924
2925    def do_rfcomm_init(self, line):
2926        """
2927        Description: Initialize the RFCOMM component services.
2928
2929        Usage:
2930          Examples:
2931            rfcomm_init
2932        """
2933        cmd = "Initialize RFCOMM proxy"
2934        try:
2935            result = self.pri_dut.sl4f.rfcomm_lib.init()
2936            self.log.info(result)
2937        except Exception as err:
2938            self.log.error(FAILURE.format(cmd, err))
2939
2940    def do_rfcomm_remove_service(self, line):
2941        """
2942        Description: Removes the RFCOMM service in use.
2943
2944        Usage:
2945          Examples:
2946            rfcomm_remove_service
2947        """
2948        cmd = "Remove RFCOMM service"
2949        try:
2950            result = self.pri_dut.sl4f.rfcomm_lib.removeService()
2951            self.log.info(result)
2952        except Exception as err:
2953            self.log.error(FAILURE.format(cmd, err))
2954
2955    def do_rfcomm_disconnect_session(self, line):
2956        """
2957        Description: Closes the RFCOMM Session.
2958
2959        Usage:
2960          Examples:
2961            rfcomm_disconnect_session
2962            rfcomm_disconnect_session
2963        """
2964        cmd = "Disconnect the RFCOMM Session"
2965        try:
2966            result = self.pri_dut.sl4f.rfcomm_lib.disconnectSession(
2967                self.unique_mac_addr_id)
2968            self.log.info(result)
2969        except Exception as err:
2970            self.log.error(FAILURE.format(cmd, err))
2971
2972    def do_rfcomm_connect_rfcomm_channel(self, line):
2973        """
2974        Description: Make an outgoing RFCOMM connection.
2975
2976        Usage:
2977          Examples:
2978            rfcomm_connect_rfcomm_channel <server_channel_number>
2979            rfcomm_connect_rfcomm_channel 2
2980        """
2981        cmd = "Make an outgoing RFCOMM connection"
2982        try:
2983            server_channel_number = int(line.strip())
2984            result = self.pri_dut.sl4f.rfcomm_lib.connectRfcommChannel(
2985                self.unique_mac_addr_id, server_channel_number)
2986            self.log.info(result)
2987        except Exception as err:
2988            self.log.error(FAILURE.format(cmd, err))
2989
2990    def do_rfcomm_disconnect_rfcomm_channel(self, line):
2991        """
2992        Description: Close the RFCOMM connection with the peer
2993
2994        Usage:
2995          Examples:
2996            rfcomm_disconnect_rfcomm_channel <server_channel_number>
2997            rfcomm_disconnect_rfcomm_channel 2
2998        """
2999        cmd = "Close the RFCOMM channel"
3000        try:
3001            server_channel_number = int(line.strip())
3002            result = self.pri_dut.sl4f.rfcomm_lib.disconnectRfcommChannel(
3003                self.unique_mac_addr_id, server_channel_number)
3004            self.log.info(result)
3005        except Exception as err:
3006            self.log.error(FAILURE.format(cmd, err))
3007
3008    def do_rfcomm_send_remote_line_status(self, line):
3009        """
3010        Description: Send a remote line status for the RFCOMM channel.
3011
3012        Usage:
3013          Examples:
3014            rfcomm_send_remote_line_status <server_channel_number>
3015            rfcomm_send_remote_line_status 2
3016        """
3017        cmd = "Send a remote line status update for the RFCOMM channel"
3018        try:
3019            server_channel_number = int(line.strip())
3020            result = self.pri_dut.sl4f.rfcomm_lib.sendRemoteLineStatus(
3021                self.unique_mac_addr_id, server_channel_number)
3022            self.log.info(result)
3023        except Exception as err:
3024            self.log.error(FAILURE.format(cmd, err))
3025
3026    def do_rfcomm_write_rfcomm(self, line):
3027        """
3028        Description: Send data over the RFCOMM channel.
3029
3030        Usage:
3031          Examples:
3032            rfcomm_write_rfcomm <server_channel_number> <data>
3033            rfcomm_write_rfcomm 2 foobar
3034        """
3035        cmd = "Send data using the RFCOMM channel"
3036        try:
3037            info = line.strip().split()
3038            if len(info) != 2:
3039                raise ValueError(
3040                    "Exactly two command line arguments required: <server_channel_number> <data>"
3041                )
3042            server_channel_number = int(info[0])
3043            data = info[1]
3044            result = self.pri_dut.sl4f.rfcomm_lib.writeRfcomm(
3045                self.unique_mac_addr_id, server_channel_number, data)
3046            self.log.info(result)
3047        except Exception as err:
3048            self.log.error(FAILURE.format(cmd, err))
3049
3050    """End RFCOMM wrappers"""
3051