1#!/usr/bin/env python3 2# 3# Copyright (C) 2020 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17Setup: 18This test only requires two fuchsia devices. 19""" 20 21from acts import signals 22from acts.base_test import BaseTestClass 23from acts.controllers.fuchsia_lib.ssh import FuchsiaSSHError 24from acts.test_decorators import test_tracker_info 25from acts_contrib.test_utils.bt.bt_test_utils import generate_id_by_size 26from acts_contrib.test_utils.fuchsia.bt_test_utils import bredr_scan_for_device_by_name 27from acts_contrib.test_utils.fuchsia.bt_test_utils import le_scan_for_device_by_name 28from acts_contrib.test_utils.fuchsia.bt_test_utils import unbond_all_known_devices 29from acts_contrib.test_utils.fuchsia.bt_test_utils import verify_device_state_by_name 30import time 31 32 33class BtFuchsiaEPTest(BaseTestClass): 34 ble_advertise_interval = 50 35 scan_timeout_seconds = 60 36 default_iterations = 1000 37 adv_name = generate_id_by_size(10) 38 test_adv_data = { 39 "name": adv_name, 40 "appearance": None, 41 "service_data": None, 42 "tx_power_level": None, 43 "service_uuids": None, 44 "manufacturer_data": None, 45 "uris": None, 46 } 47 test_connectable = True 48 test_scan_response = None 49 50 def setup_class(self): 51 super().setup_class() 52 for fd in self.fuchsia_devices: 53 fd.sl4f.bts_lib.initBluetoothSys() 54 self.pri_dut = self.fuchsia_devices[0] 55 self.sec_dut = self.fuchsia_devices[1] 56 57 def on_fail(self, test_name, begin_time): 58 for fd in self.fuchsia_devices: 59 fd.take_bug_report(test_name, begin_time) 60 self._unbond_all_known_devices() 61 self.sec_dut.sl4f.ble_lib.bleStopBleAdvertising() 62 self._kill_media_services() 63 64 def teardown_class(self): 65 self._kill_media_services() 66 67 def _kill_media_services(self): 68 """Kill any BT services related to A2DP/AVRCP on all Fuchsia devices. 69 """ 70 ssh_timeout = 30 71 for fd in self.fuchsia_devices: 72 try: 73 fd.ssh.run("killall bt-a2dp*", timeout_sec=ssh_timeout) 74 fd.ssh.run("killall bt-avrcp*", timeout_sec=ssh_timeout) 75 except FuchsiaSSHError: 76 pass 77 78 def _unbond_all_known_devices(self): 79 """For all Fuchsia devices, unbond any known pairings. 80 """ 81 time.sleep(5) 82 for fd in self.fuchsia_devices: 83 unbond_all_known_devices(fd, self.log) 84 85 def test_ble_awareness(self): 86 """Verify that Fuchsia devices can advertise and scan each other 87 88 Verify a Fuchsia device that starts a BLE advertisesement can be 89 found by a Fuchsia BLE scanner. 90 91 Steps: 92 1. On one Fuchsia device set an advertisement 93 2. On one Fuchsia device, scan for the advertisement by name 94 95 Expected Result: 96 Verify that there are no errors after each GATT connection. 97 98 Returns: 99 signals.TestPass if no errors 100 signals.TestFailure if there are any errors during the test. 101 102 TAGS: BLE 103 Priority: 0 104 """ 105 106 self.sec_dut.sl4f.ble_lib.bleStartBleAdvertising( 107 self.test_adv_data, self.test_scan_response, 108 self.ble_advertise_interval, self.test_connectable) 109 110 device = le_scan_for_device_by_name(self.pri_dut, self.log, 111 self.adv_name, 112 self.scan_timeout_seconds) 113 self.sec_dut.sl4f.ble_lib.bleStopBleAdvertising() 114 if device is None: 115 raise signals.TestFailure("Scanner unable to find advertisement.") 116 raise signals.TestPass("Success") 117 118 def test_gatt_central_peripheral(self): 119 """Verify that Fuchsia devices can perform GATT operations 120 121 Verify a Fuchsia devices can perform GATT connections and interactions. 122 123 Steps: 124 1. On one Fuchsia device set an advertisement 125 2. On one Fuchsia device, scan for the advertisement by name 126 3. Perform GATT connection over LE 127 4. Pair both devices. 128 5. Perform GATT read/write operations. 129 6. Perform GATT disconnection. 130 131 Expected Result: 132 Verify that there are no errors after each GATT connection. 133 134 Returns: 135 signals.TestPass if no errors 136 signals.TestFailure if there are any errors during the test. 137 138 TAGS: BLE 139 Priority: 0 140 """ 141 self._unbond_all_known_devices() 142 143 source_device_name = generate_id_by_size(10) 144 self.pri_dut.sl4f.bts_lib.setName(source_device_name) 145 146 self.sec_dut.sl4f.ble_lib.bleStartBleAdvertising( 147 self.test_adv_data, self.test_scan_response, 148 self.ble_advertise_interval, self.test_connectable) 149 150 device = le_scan_for_device_by_name(self.pri_dut, self.log, 151 self.adv_name, 152 self.scan_timeout_seconds) 153 if device is None: 154 raise signals.TestFailure("Scanner unable to find advertisement.") 155 156 connect_result = self.pri_dut.sl4f.gattc_lib.bleConnectToPeripheral( 157 device["id"]) 158 if connect_result.get("error") is not None: 159 raise signals.TestFailure("GATT Connection failed with: {}".format( 160 connect_result.get("error"))) 161 162 if not verify_device_state_by_name(self.pri_dut, self.log, 163 self.adv_name, "CONNECTED", None): 164 raise signals.TestFailure( 165 "Failed to connect to device {}.".format(target_device_name)) 166 167 if not verify_device_state_by_name( 168 self.sec_dut, self.log, source_device_name, "CONNECTED", None): 169 raise signals.TestFailure( 170 "Failed to connect to device {}.".format(source_device_name)) 171 172 security_level = "ENCRYPTED" 173 non_bondable = False 174 transport = 2 #LE 175 self.pri_dut.sl4f.bts_lib.pair(device["id"], security_level, 176 non_bondable, transport) 177 178 services = None 179 if not verify_device_state_by_name(self.pri_dut, self.log, 180 self.adv_name, "BONDED", services): 181 raise signals.TestFailure( 182 "Failed to pair device {}.".format(target_device_name)) 183 184 if not verify_device_state_by_name(self.sec_dut, self.log, 185 source_device_name, "BONDED", 186 services): 187 raise signals.TestFailure( 188 "Failed to pair device {}.".format(source_device_name)) 189 190 disconnect_result = self.pri_dut.sl4f.gattc_lib.bleDisconnectPeripheral( 191 device["id"]) 192 if disconnect_result.get("error") is not None: 193 raise signals.TestFailure( 194 "GATT Disconnection failed with: {}".format( 195 connect_result.get("error"))) 196 197 self.sec_dut.sl4f.ble_lib.bleStopBleAdvertising() 198 199 # TODO: Setup Proper GATT server and verify services published are found 200 201 raise signals.TestPass("Success") 202 203 def test_pairing_a2dp(self): 204 """Verify that Fuchsia devices can pair to each other and establish 205 an A2DP connection 206 207 Verify that Fuchsia devices can pair to each other and establish 208 an A2DP connection 209 210 Steps: 211 1. Clear out all bonded devices 212 2. Stop any A2DP services running on the device 213 Needed to take ownership of the services 214 3. Init sink and source opposite devices 215 4. Start pairing delegate for all Fuchsia devices 216 5. Set sink device to be discoverable 217 6. Discover sink device from source device 218 7. Connect to sink device from source device 219 8. Pair to sink device 220 9. Validate paired devices and services present 221 222 Expected Result: 223 Verify devices are successfully paired and appropriate a2dp 224 services are running. 225 226 Returns: 227 signals.TestPass if no errors 228 signals.TestFailure if there are any errors during the test. 229 230 TAGS: BREDR, A2DP 231 Priority: 0 232 """ 233 self._unbond_all_known_devices() 234 self._kill_media_services() 235 236 source_device_name = generate_id_by_size(10) 237 target_device_name = generate_id_by_size(10) 238 239 self.pri_dut.sl4f.bts_lib.setName(source_device_name) 240 self.sec_dut.sl4f.bts_lib.setName(target_device_name) 241 242 input_capabilities = "NONE" 243 output_capabilities = "NONE" 244 245 # Initialize a2dp on both devices. 246 self.pri_dut.sl4f.avdtp_lib.init() 247 self.sec_dut.sl4f.avdtp_lib.init() 248 249 self.pri_dut.sl4f.bts_lib.acceptPairing(input_capabilities, 250 output_capabilities) 251 252 self.sec_dut.sl4f.bts_lib.acceptPairing(input_capabilities, 253 output_capabilities) 254 self.sec_dut.sl4f.bts_lib.setDiscoverable(True) 255 256 unique_mac_addr_id = bredr_scan_for_device_by_name( 257 self.pri_dut, self.log, target_device_name, 258 self.scan_timeout_seconds) 259 260 if not unique_mac_addr_id: 261 raise signals.TestFailure( 262 "Failed to find device {}.".format(target_device_name)) 263 264 connect_result = self.pri_dut.sl4f.bts_lib.connectDevice( 265 unique_mac_addr_id) 266 if connect_result.get("error") is not None: 267 raise signals.TestFailure("Failed to connect with {}.".format( 268 connect_result.get("error"))) 269 270 # We pair before checking the CONNECTED status because BR/EDR semantics 271 # were recently changed such that if pairing is not confirmed, then bt 272 # does not report connected = True. 273 security_level = "NONE" 274 bondable = True 275 transport = 1 #BREDR 276 pair_result = self.pri_dut.sl4f.bts_lib.pair(unique_mac_addr_id, 277 security_level, bondable, 278 transport) 279 if pair_result.get("error") is not None: 280 raise signals.TestFailure("Failed to pair with {}.".format( 281 pair_result.get("error"))) 282 283 if not verify_device_state_by_name( 284 self.pri_dut, self.log, target_device_name, "CONNECTED", None): 285 raise signals.TestFailure( 286 "Failed to connect to device {}.".format(target_device_name)) 287 288 if not verify_device_state_by_name( 289 self.sec_dut, self.log, source_device_name, "CONNECTED", None): 290 raise signals.TestFailure( 291 "Failed to connect to device {}.".format(source_device_name)) 292 293 #TODO: Validation of services and paired devices (b/175641870) 294 # A2DP sink: 0000110b-0000-1000-8000-00805f9b34fb 295 # A2DP source: 0000110a-0000-1000-8000-00805f9b34fb 296 #TODO: Make an easy function for checking/updating devices 297 services = None 298 if not verify_device_state_by_name(self.pri_dut, self.log, 299 target_device_name, "BONDED", 300 services): 301 raise signals.TestFailure( 302 "Failed to pair device {}.".format(target_device_name)) 303 304 if not verify_device_state_by_name(self.sec_dut, self.log, 305 source_device_name, "BONDED", 306 services): 307 raise signals.TestFailure( 308 "Failed to pair device {}.".format(source_device_name)) 309 310 raise signals.TestPass("Success") 311