1#!/usr/bin/env python3
2#
3# Copyright (C) 2020 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17Setup:
18This test only requires two fuchsia devices.
19"""
20
21from acts import signals
22from acts.base_test import BaseTestClass
23from acts.controllers.fuchsia_lib.ssh import FuchsiaSSHError
24from acts.test_decorators import test_tracker_info
25from acts_contrib.test_utils.bt.bt_test_utils import generate_id_by_size
26from acts_contrib.test_utils.fuchsia.bt_test_utils import bredr_scan_for_device_by_name
27from acts_contrib.test_utils.fuchsia.bt_test_utils import le_scan_for_device_by_name
28from acts_contrib.test_utils.fuchsia.bt_test_utils import unbond_all_known_devices
29from acts_contrib.test_utils.fuchsia.bt_test_utils import verify_device_state_by_name
30import time
31
32
33class BtFuchsiaEPTest(BaseTestClass):
34    ble_advertise_interval = 50
35    scan_timeout_seconds = 60
36    default_iterations = 1000
37    adv_name = generate_id_by_size(10)
38    test_adv_data = {
39        "name": adv_name,
40        "appearance": None,
41        "service_data": None,
42        "tx_power_level": None,
43        "service_uuids": None,
44        "manufacturer_data": None,
45        "uris": None,
46    }
47    test_connectable = True
48    test_scan_response = None
49
50    def setup_class(self):
51        super().setup_class()
52        for fd in self.fuchsia_devices:
53            fd.sl4f.bts_lib.initBluetoothSys()
54        self.pri_dut = self.fuchsia_devices[0]
55        self.sec_dut = self.fuchsia_devices[1]
56
57    def on_fail(self, test_name, begin_time):
58        for fd in self.fuchsia_devices:
59            fd.take_bug_report(test_name, begin_time)
60        self._unbond_all_known_devices()
61        self.sec_dut.sl4f.ble_lib.bleStopBleAdvertising()
62        self._kill_media_services()
63
64    def teardown_class(self):
65        self._kill_media_services()
66
67    def _kill_media_services(self):
68        """Kill any BT services related to A2DP/AVRCP on all Fuchsia devices.
69        """
70        ssh_timeout = 30
71        for fd in self.fuchsia_devices:
72            try:
73                fd.ssh.run("killall bt-a2dp*", timeout_sec=ssh_timeout)
74                fd.ssh.run("killall bt-avrcp*", timeout_sec=ssh_timeout)
75            except FuchsiaSSHError:
76                pass
77
78    def _unbond_all_known_devices(self):
79        """For all Fuchsia devices, unbond any known pairings.
80        """
81        time.sleep(5)
82        for fd in self.fuchsia_devices:
83            unbond_all_known_devices(fd, self.log)
84
85    def test_ble_awareness(self):
86        """Verify that Fuchsia devices can advertise and scan each other
87
88        Verify a Fuchsia device that starts a BLE advertisesement can be
89        found by a Fuchsia BLE scanner.
90
91        Steps:
92        1. On one Fuchsia device set an advertisement
93        2. On one Fuchsia device, scan for the advertisement by name
94
95        Expected Result:
96        Verify that there are no errors after each GATT connection.
97
98        Returns:
99          signals.TestPass if no errors
100          signals.TestFailure if there are any errors during the test.
101
102        TAGS: BLE
103        Priority: 0
104        """
105
106        self.sec_dut.sl4f.ble_lib.bleStartBleAdvertising(
107            self.test_adv_data, self.test_scan_response,
108            self.ble_advertise_interval, self.test_connectable)
109
110        device = le_scan_for_device_by_name(self.pri_dut, self.log,
111                                            self.adv_name,
112                                            self.scan_timeout_seconds)
113        self.sec_dut.sl4f.ble_lib.bleStopBleAdvertising()
114        if device is None:
115            raise signals.TestFailure("Scanner unable to find advertisement.")
116        raise signals.TestPass("Success")
117
118    def test_gatt_central_peripheral(self):
119        """Verify that Fuchsia devices can perform GATT operations
120
121        Verify a Fuchsia devices can perform GATT connections and interactions.
122
123        Steps:
124        1. On one Fuchsia device set an advertisement
125        2. On one Fuchsia device, scan for the advertisement by name
126        3. Perform GATT connection over LE
127        4. Pair both devices.
128        5. Perform GATT read/write operations.
129        6. Perform GATT disconnection.
130
131        Expected Result:
132        Verify that there are no errors after each GATT connection.
133
134        Returns:
135          signals.TestPass if no errors
136          signals.TestFailure if there are any errors during the test.
137
138        TAGS: BLE
139        Priority: 0
140        """
141        self._unbond_all_known_devices()
142
143        source_device_name = generate_id_by_size(10)
144        self.pri_dut.sl4f.bts_lib.setName(source_device_name)
145
146        self.sec_dut.sl4f.ble_lib.bleStartBleAdvertising(
147            self.test_adv_data, self.test_scan_response,
148            self.ble_advertise_interval, self.test_connectable)
149
150        device = le_scan_for_device_by_name(self.pri_dut, self.log,
151                                            self.adv_name,
152                                            self.scan_timeout_seconds)
153        if device is None:
154            raise signals.TestFailure("Scanner unable to find advertisement.")
155
156        connect_result = self.pri_dut.sl4f.gattc_lib.bleConnectToPeripheral(
157            device["id"])
158        if connect_result.get("error") is not None:
159            raise signals.TestFailure("GATT Connection failed with: {}".format(
160                connect_result.get("error")))
161
162        if not verify_device_state_by_name(self.pri_dut, self.log,
163                                           self.adv_name, "CONNECTED", None):
164            raise signals.TestFailure(
165                "Failed to connect to device {}.".format(target_device_name))
166
167        if not verify_device_state_by_name(
168                self.sec_dut, self.log, source_device_name, "CONNECTED", None):
169            raise signals.TestFailure(
170                "Failed to connect to device {}.".format(source_device_name))
171
172        security_level = "ENCRYPTED"
173        non_bondable = False
174        transport = 2  #LE
175        self.pri_dut.sl4f.bts_lib.pair(device["id"], security_level,
176                                       non_bondable, transport)
177
178        services = None
179        if not verify_device_state_by_name(self.pri_dut, self.log,
180                                           self.adv_name, "BONDED", services):
181            raise signals.TestFailure(
182                "Failed to pair device {}.".format(target_device_name))
183
184        if not verify_device_state_by_name(self.sec_dut, self.log,
185                                           source_device_name, "BONDED",
186                                           services):
187            raise signals.TestFailure(
188                "Failed to pair device {}.".format(source_device_name))
189
190        disconnect_result = self.pri_dut.sl4f.gattc_lib.bleDisconnectPeripheral(
191            device["id"])
192        if disconnect_result.get("error") is not None:
193            raise signals.TestFailure(
194                "GATT Disconnection failed with: {}".format(
195                    connect_result.get("error")))
196
197        self.sec_dut.sl4f.ble_lib.bleStopBleAdvertising()
198
199        # TODO: Setup Proper GATT server and verify services published are found
200
201        raise signals.TestPass("Success")
202
203    def test_pairing_a2dp(self):
204        """Verify that Fuchsia devices can pair to each other and establish
205            an A2DP connection
206
207            Verify that Fuchsia devices can pair to each other and establish
208            an A2DP connection
209
210            Steps:
211            1. Clear out all bonded devices
212            2. Stop any A2DP services running on the device
213                Needed to take ownership of the services
214            3. Init sink and source opposite devices
215            4. Start pairing delegate for all Fuchsia devices
216            5. Set sink device to be discoverable
217            6. Discover sink device from source device
218            7. Connect to sink device from source device
219            8. Pair to sink device
220            9. Validate paired devices and services present
221
222            Expected Result:
223            Verify devices are successfully paired and appropriate a2dp
224            services are running.
225
226            Returns:
227            signals.TestPass if no errors
228            signals.TestFailure if there are any errors during the test.
229
230            TAGS: BREDR, A2DP
231            Priority: 0
232        """
233        self._unbond_all_known_devices()
234        self._kill_media_services()
235
236        source_device_name = generate_id_by_size(10)
237        target_device_name = generate_id_by_size(10)
238
239        self.pri_dut.sl4f.bts_lib.setName(source_device_name)
240        self.sec_dut.sl4f.bts_lib.setName(target_device_name)
241
242        input_capabilities = "NONE"
243        output_capabilities = "NONE"
244
245        # Initialize a2dp on both devices.
246        self.pri_dut.sl4f.avdtp_lib.init()
247        self.sec_dut.sl4f.avdtp_lib.init()
248
249        self.pri_dut.sl4f.bts_lib.acceptPairing(input_capabilities,
250                                                output_capabilities)
251
252        self.sec_dut.sl4f.bts_lib.acceptPairing(input_capabilities,
253                                                output_capabilities)
254        self.sec_dut.sl4f.bts_lib.setDiscoverable(True)
255
256        unique_mac_addr_id = bredr_scan_for_device_by_name(
257            self.pri_dut, self.log, target_device_name,
258            self.scan_timeout_seconds)
259
260        if not unique_mac_addr_id:
261            raise signals.TestFailure(
262                "Failed to find device {}.".format(target_device_name))
263
264        connect_result = self.pri_dut.sl4f.bts_lib.connectDevice(
265            unique_mac_addr_id)
266        if connect_result.get("error") is not None:
267            raise signals.TestFailure("Failed to connect with {}.".format(
268                connect_result.get("error")))
269
270        # We pair before checking the CONNECTED status because BR/EDR semantics
271        # were recently changed such that if pairing is not confirmed, then bt
272        # does not report connected = True.
273        security_level = "NONE"
274        bondable = True
275        transport = 1  #BREDR
276        pair_result = self.pri_dut.sl4f.bts_lib.pair(unique_mac_addr_id,
277                                                     security_level, bondable,
278                                                     transport)
279        if pair_result.get("error") is not None:
280            raise signals.TestFailure("Failed to pair with {}.".format(
281                pair_result.get("error")))
282
283        if not verify_device_state_by_name(
284                self.pri_dut, self.log, target_device_name, "CONNECTED", None):
285            raise signals.TestFailure(
286                "Failed to connect to device {}.".format(target_device_name))
287
288        if not verify_device_state_by_name(
289                self.sec_dut, self.log, source_device_name, "CONNECTED", None):
290            raise signals.TestFailure(
291                "Failed to connect to device {}.".format(source_device_name))
292
293        #TODO: Validation of services and paired devices (b/175641870)
294        # A2DP sink: 0000110b-0000-1000-8000-00805f9b34fb
295        # A2DP source: 0000110a-0000-1000-8000-00805f9b34fb
296        #TODO: Make an easy function for checking/updating devices
297        services = None
298        if not verify_device_state_by_name(self.pri_dut, self.log,
299                                           target_device_name, "BONDED",
300                                           services):
301            raise signals.TestFailure(
302                "Failed to pair device {}.".format(target_device_name))
303
304        if not verify_device_state_by_name(self.sec_dut, self.log,
305                                           source_device_name, "BONDED",
306                                           services):
307            raise signals.TestFailure(
308                "Failed to pair device {}.".format(source_device_name))
309
310        raise signals.TestPass("Success")
311