1#!/usr/bin/env python3
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17This test script is for partial automation of LE devices
18
19This script requires these custom parameters in the config file:
20
21"ble_mac_address"
22"service_uuid"
23"notifiable_char_uuid"
24"""
25
26from queue import Empty
27import time
28
29from acts_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
30from acts_contrib.test_utils.bt.bt_constants import ble_scan_settings_modes
31from acts_contrib.test_utils.bt.bt_constants import gatt_cb_err
32from acts_contrib.test_utils.bt.bt_constants import gatt_cb_strings
33from acts_contrib.test_utils.bt.bt_constants import gatt_descriptor
34from acts_contrib.test_utils.bt.bt_constants import gatt_transport
35from acts_contrib.test_utils.bt.bt_constants import scan_result
36from acts_contrib.test_utils.bt.bt_gatt_utils import GattTestUtilsError
37from acts_contrib.test_utils.bt.bt_gatt_utils import disconnect_gatt_connection
38from acts_contrib.test_utils.bt.bt_test_utils import generate_ble_scan_objects
39from acts_contrib.test_utils.bt.bt_gatt_utils import setup_gatt_connection
40from acts_contrib.test_utils.bt.bt_gatt_utils import log_gatt_server_uuids
41from acts_contrib.test_utils.bt.bt_test_utils import reset_bluetooth
42
43
44class GattToolTest(BluetoothBaseTest):
45    AUTOCONNECT = False
46    DEFAULT_TIMEOUT = 10
47    PAIRING_TIMEOUT = 20
48    adv_instances = []
49    timer_list = []
50
51    def setup_class(self):
52        super().setup_class()
53        # Central role Android device
54        self.cen_ad = self.android_devices[0]
55        self.ble_mac_address = self.user_params['ble_mac_address']
56        self.SERVICE_UUID = self.user_params['service_uuid']
57        self.NOTIFIABLE_CHAR_UUID = self.user_params['notifiable_char_uuid']
58        # CCC == Client Characteristic Configuration
59        self.CCC_DESC_UUID = "00002902-0000-1000-8000-00805f9b34fb"
60
61    def setup_test(self):
62        super(BluetoothBaseTest, self).setup_test()
63        if not self._is_peripheral_advertising():
64            input("Press enter when peripheral is advertising...")
65        return True
66
67    def teardown_test(self):
68        super(BluetoothBaseTest, self).teardown_test()
69        self.log_stats()
70        self.timer_list = []
71        return True
72
73    def _pair_with_peripheral(self):
74        self.cen_ad.droid.bluetoothDiscoverAndBond(self.ble_mac_address)
75        end_time = time.time() + self.PAIRING_TIMEOUT
76        self.log.info("Verifying devices are bonded")
77        while time.time() < end_time:
78            bonded_devices = self.cen_ad.droid.bluetoothGetBondedDevices()
79            if self.ble_mac_address in {d['address'] for d in bonded_devices}:
80                self.log.info("Successfully bonded to device")
81                return True
82        return False
83
84    def _is_peripheral_advertising(self):
85        self.cen_ad.droid.bleSetScanFilterDeviceAddress(self.ble_mac_address)
86        self.cen_ad.droid.bleSetScanSettingsScanMode(
87            ble_scan_settings_modes['low_latency'])
88        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(
89            self.cen_ad.droid)
90        self.cen_ad.droid.bleBuildScanFilter(filter_list)
91
92        self.cen_ad.droid.bleStartBleScan(filter_list, scan_settings,
93                                          scan_callback)
94        expected_event_name = scan_result.format(scan_callback)
95        test_result = True
96        try:
97            self.cen_ad.ed.pop_event(expected_event_name, self.DEFAULT_TIMEOUT)
98            self.log.info(
99                "Peripheral found with event: {}".format(expected_event_name))
100        except Empty:
101            self.log.info("Peripheral not advertising or not found: {}".format(
102                self.ble_mac_address))
103            test_result = False
104        self.cen_ad.droid.bleStopBleScan(scan_callback)
105        return test_result
106
107    def _unbond_device(self):
108        self.cen_ad.droid.bluetoothUnbond(self.ble_mac_address)
109        time.sleep(2)  #Grace timeout for unbonding to finish
110        bonded_devices = self.cen_ad.droid.bluetoothGetBondedDevices()
111        if bonded_devices:
112            self.log.error(
113                "Failed to unbond device... found: {}".format(bonded_devices))
114            return False
115        return True
116
117    @BluetoothBaseTest.bt_test_wrap
118    def test_gatt_connect_without_scanning(self):
119        """Test the round trip speed of connecting to a peripheral
120
121        This test will prompt the user to press "Enter" when the
122        peripheral is in a connecable advertisement state. Once
123        the user presses enter, this script will measure the amount
124        of time it takes to establish a GATT connection to the
125        peripheral. The test will then disconnect
126
127        Steps:
128        1. Wait for user input to confirm peripheral is advertising.
129        2. Start timer
130        3. Perform GATT connection to peripheral
131        4. Upon successful connection, stop timer
132        5. Disconnect from peripheral
133
134        Expected Result:
135        Device should be connected successfully
136
137        Returns:
138          Pass if True
139          Fail if False
140
141        TAGS: LE, GATT
142        Priority: 1
143        """
144        self.AUTOCONNECT = False
145        start_time = self._get_time_in_milliseconds()
146        try:
147            bluetooth_gatt, gatt_callback = (setup_gatt_connection(
148                self.cen_ad, self.ble_mac_address, self.AUTOCONNECT,
149                gatt_transport['le']))
150        except GattTestUtilsError as err:
151            self.log.error(err)
152            return False
153        end_time = self._get_time_in_milliseconds()
154        self.log.info("Total time (ms): {}".format(end_time - start_time))
155        try:
156            disconnect_gatt_connection(self.cen_ad, bluetooth_gatt,
157                                       gatt_callback)
158            self.cen_ad.droid.gattClientClose(bluetooth_gatt)
159        except GattTestUtilsError as err:
160            self.log.error(err)
161            return False
162        self.cen_ad.droid.gattClientClose(bluetooth_gatt)
163
164    @BluetoothBaseTest.bt_test_wrap
165    def test_gatt_connect_stress(self):
166        """Test the round trip speed of connecting to a peripheral many times
167
168        This test will prompt the user to press "Enter" when the
169        peripheral is in a connecable advertisement state. Once
170        the user presses enter, this script will measure the amount
171        of time it takes to establish a GATT connection to the
172        peripheral. The test will then disconnect. It will attempt to
173        repeat this process multiple times.
174
175        Steps:
176        1. Wait for user input to confirm peripheral is advertising.
177        2. Start timer
178        3. Perform GATT connection to peripheral
179        4. Upon successful connection, stop timer
180        5. Disconnect from peripheral
181        6. Repeat steps 2-5 1000 times.
182
183        Expected Result:
184        Test should measure 1000 iterations of connect/disconnect cycles.
185
186        Returns:
187          Pass if True
188          Fail if False
189
190        TAGS: LE, GATT
191        Priority: 2
192        """
193        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(
194            self.cen_ad.droid)
195        self.cen_ad.droid.bleStartBleScan(filter_list, scan_settings,
196                                          scan_callback)
197        self.AUTOCONNECT = False
198        iterations = 1000
199        n = 0
200        while n < iterations:
201            self.start_timer()
202            try:
203                bluetooth_gatt, gatt_callback = (setup_gatt_connection(
204                    self.cen_ad, self.ble_mac_address, self.AUTOCONNECT,
205                    gatt_transport['le']))
206            except GattTestUtilsError as err:
207                self.log.error(err)
208                return False
209            self.log.info("Total time (ms): {}".format(self.end_timer()))
210            try:
211                disconnect_gatt_connection(self.cen_ad, bluetooth_gatt,
212                                           gatt_callback)
213                self.cen_ad.droid.gattClientClose(bluetooth_gatt)
214            except GattTestUtilsError as err:
215                self.log.error(err)
216                return False
217            n += 1
218        return True
219
220    @BluetoothBaseTest.bt_test_wrap
221    def test_gatt_connect_iterate_uuids(self):
222        """Test the discovery of uuids of a peripheral
223
224        This test will prompt the user to press "Enter" when the
225        peripheral is in a connecable advertisement state. Once
226        the user presses enter, this script connects an Android device
227        to the periphal and attempt to discover all services,
228        characteristics, and descriptors.
229
230        Steps:
231        1. Wait for user input to confirm peripheral is advertising.
232        2. Perform GATT connection to peripheral
233        3. Upon successful connection, iterate through all services,
234        characteristics, and descriptors.
235        5. Disconnect from peripheral
236
237        Expected Result:
238        Device services, characteristics, and descriptors should all
239        be read.
240
241        Returns:
242          Pass if True
243          Fail if False
244
245        TAGS: LE, GATT
246        Priority: 2
247        """
248        try:
249            bluetooth_gatt, gatt_callback = (setup_gatt_connection(
250                self.cen_ad, self.ble_mac_address, self.AUTOCONNECT,
251                gatt_transport['le']))
252        except GattTestUtilsError as err:
253            self.log.error(err)
254            return False
255        if self.cen_ad.droid.gattClientDiscoverServices(bluetooth_gatt):
256            expected_event = gatt_cb_strings['gatt_serv_disc'].format(
257                gatt_callback)
258            try:
259                event = self.cen_ad.ed.pop_event(expected_event,
260                                                 self.DEFAULT_TIMEOUT)
261                discovered_services_index = event['data']['ServicesIndex']
262            except Empty:
263                self.log.error(
264                    gatt_cb_err['gatt_serv_disc'].format(expected_event))
265                return False
266            log_gatt_server_uuids(self.cen_ad, discovered_services_index)
267        try:
268            disconnect_gatt_connection(self.cen_ad, bluetooth_gatt,
269                                       gatt_callback)
270            self.cen_ad.droid.gattClientClose(bluetooth_gatt)
271        except GattTestUtilsError as err:
272            self.log.error(err)
273            return False
274        self.cen_ad.droid.gattClientClose(bluetooth_gatt)
275        return True
276
277    @BluetoothBaseTest.bt_test_wrap
278    def test_pairing(self):
279        """Test pairing to a GATT mac address
280
281        This test will prompt the user to press "Enter" when the
282        peripheral is in a connecable advertisement state. Once
283        the user presses enter, this script will bond the Android device
284        to the peripheral.
285
286        Steps:
287        1. Wait for user input to confirm peripheral is advertising.
288        2. Perform Bluetooth pairing to GATT mac address
289        3. Upon successful bonding.
290        4. Unbond from device
291
292        Expected Result:
293        Device services, characteristics, and descriptors should all
294        be read.
295
296        Returns:
297          Pass if True
298          Fail if False
299
300        TAGS: LE, GATT
301        Priority: 1
302        """
303        if not self._pair_with_peripheral():
304            return False
305        self.cen_ad.droid.bluetoothUnbond(self.ble_mac_address)
306        return self._unbond_device()
307
308    @BluetoothBaseTest.bt_test_wrap
309    def test_pairing_stress(self):
310        """Test the round trip speed of pairing to a peripheral many times
311
312        This test will prompt the user to press "Enter" when the
313        peripheral is in a connecable advertisement state. Once
314        the user presses enter, this script will measure the amount
315        of time it takes to establish a pairing with a BLE device.
316
317        Steps:
318        1. Wait for user input to confirm peripheral is advertising.
319        2. Start timer
320        3. Perform Bluetooth pairing to GATT mac address
321        4. Upon successful bonding, stop timer.
322        5. Unbond from device
323        6. Repeat steps 2-5 100 times.
324
325        Expected Result:
326        Test should measure 100 iterations of bonding.
327
328        Returns:
329          Pass if True
330          Fail if False
331
332        TAGS: LE, GATT
333        Priority: 3
334        """
335        iterations = 100
336        for _ in range(iterations):
337            start_time = self.start_timer()
338            if not self._pair_with_peripheral():
339                return False
340            self.log.info("Total time (ms): {}".format(self.end_timer()))
341            if not self._unbond_device():
342                return False
343        return True
344
345    @BluetoothBaseTest.bt_test_wrap
346    def test_gatt_notification_longev(self):
347        """Test GATT characterisitic notifications for long periods of time
348
349        This test will prompt the user to press "Enter" when the
350        peripheral is in a connecable advertisement state. Once
351        the user presses enter, this script aims to set characteristic
352        notification to true on the config file's SERVICE_UUID,
353        NOTIFIABLE_CHAR_UUID, and CCC_DESC_UUID. This test assumes
354        the peripheral will constantly write data to a notifiable
355        characteristic.
356
357        Steps:
358        1. Wait for user input to confirm peripheral is advertising.
359        2. Perform Bluetooth pairing to GATT mac address
360        3. Perform a GATT connection to the periheral
361        4. Get the discovered service uuid that matches the user's input
362        in the config file
363        4. Write to the CCC descriptor to enable notifications
364        5. Enable notifications on the user's input Characteristic UUID
365        6. Continuously wait for Characteristic Changed events which
366        equate to recieving notifications for 15 minutes.
367
368        Expected Result:
369        There should be no disconnects and we should constantly receive
370        Characteristic Changed information. Values should vary upon user
371        interaction with the peripheral.
372
373        Returns:
374          Pass if True
375          Fail if False
376
377        TAGS: LE, GATT
378        Priority: 1
379        """
380        #pair devices
381        if not self._pair_with_peripheral():
382            return False
383        try:
384            bluetooth_gatt, gatt_callback = (setup_gatt_connection(
385                self.cen_ad, self.ble_mac_address, self.AUTOCONNECT,
386                gatt_transport['le']))
387        except GattTestUtilsError as err:
388            self.log.error(err)
389            return False
390        if self.cen_ad.droid.gattClientDiscoverServices(bluetooth_gatt):
391            expected_event = gatt_cb_strings['gatt_serv_disc'].format(
392                gatt_callback)
393            try:
394                event = self.cen_ad.ed.pop_event(expected_event,
395                                                 self.DEFAULT_TIMEOUT)
396                discovered_services_index = event['data']['ServicesIndex']
397            except Empty:
398                self.log.error(
399                    gatt_cb_err['gatt_serv_disc'].format(expected_event))
400                return False
401        # TODO: in setup save service_cound and discovered_services_index
402        # programatically
403        services_count = self.cen_ad.droid.gattClientGetDiscoveredServicesCount(
404            discovered_services_index)
405        test_service_index = None
406        for i in range(services_count):
407            disc_service_uuid = (
408                self.cen_ad.droid.gattClientGetDiscoveredServiceUuid(
409                    discovered_services_index, i))
410            if disc_service_uuid == self.SERVICE_UUID:
411                test_service_index = i
412                break
413        if not test_service_index:
414            self.log.error("Service not found.")
415            return False
416
417        self.cen_ad.droid.gattClientDescriptorSetValue(
418            bluetooth_gatt, discovered_services_index, test_service_index,
419            self.NOTIFIABLE_CHAR_UUID, self.CCC_DESC_UUID,
420            gatt_descriptor['enable_notification_value'])
421
422        self.cen_ad.droid.gattClientWriteDescriptor(bluetooth_gatt,
423                                                    discovered_services_index,
424                                                    test_service_index,
425                                                    self.NOTIFIABLE_CHAR_UUID,
426                                                    self.CCC_DESC_UUID)
427
428        self.cen_ad.droid.gattClientSetCharacteristicNotification(
429            bluetooth_gatt, discovered_services_index, test_service_index,
430            self.NOTIFIABLE_CHAR_UUID, True)
431
432        # set 15 minute notification test time
433        notification_test_time = 900
434        end_time = time.time() + notification_test_time
435        expected_event = gatt_cb_strings['char_change'].format(gatt_callback)
436        while time.time() < end_time:
437            try:
438                event = self.cen_ad.ed.pop_event(expected_event,
439                                                 self.DEFAULT_TIMEOUT)
440                self.log.info(event)
441            except Empty as err:
442                print(err)
443                self.log.error(
444                    gatt_cb_err['char_change_err'].format(expected_event))
445                return False
446        return True
447