1# Copyright 2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import asyncio 16import avatar 17import logging 18 19from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices 20from bumble.colors import color 21from bumble.core import BT_AUDIO_SOURCE_SERVICE 22from bumble.sdp import ( 23 SDP_ALL_ATTRIBUTES_RANGE, 24 SDP_PUBLIC_BROWSE_ROOT, 25 SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, 26 Client as SdpClient, 27 ServiceAttribute, 28) 29from mobly import base_test, test_runner 30from mobly.asserts import assert_equal # type: ignore 31from mobly.asserts import assert_in # type: ignore 32from mobly.asserts import assert_is_none # type: ignore 33from mobly.asserts import assert_is_not_none # type: ignore 34from mobly.asserts import fail # type: ignore 35from typing import Optional 36 37 38class SdpTest(base_test.BaseTestClass): # type: ignore[misc] 39 ''' 40 This class aim to test SDP on Classic Bluetooth devices. 41 ''' 42 43 devices: Optional[PandoraDevices] = None 44 45 # pandora devices. 46 dut: PandoraDevice 47 ref: PandoraDevice 48 49 @avatar.asynchronous 50 async def setup_class(self) -> None: 51 self.devices = PandoraDevices(self) 52 self.dut, self.ref, *_ = self.devices 53 54 # Enable BR/EDR mode and SSP for Bumble devices. 55 for device in self.devices: 56 if isinstance(device, BumblePandoraDevice): 57 device.config.setdefault('classic_enabled', True) 58 device.config.setdefault('classic_ssp_enabled', True) 59 device.config.setdefault( 60 'server', 61 { 62 'io_capability': 'display_output_and_yes_no_input', 63 }, 64 ) 65 66 await asyncio.gather(self.dut.reset(), self.ref.reset()) 67 68 def teardown_class(self) -> None: 69 if self.devices: 70 self.devices.stop_all() 71 72 @avatar.asynchronous 73 async def test_sdp_connect_and_disconnect(self) -> None: 74 # Pandora connection tokens 75 ref_dut, dut_ref = None, None 76 77 # Make classic connection 78 ref_dut_res, dut_ref_res = await asyncio.gather( 79 self.ref.aio.host.Connect(address=self.dut.address), 80 self.dut.aio.host.WaitConnection(address=self.ref.address), 81 ) 82 assert_is_not_none(ref_dut_res.connection) 83 assert_is_not_none(dut_ref_res.connection) 84 ref_dut, dut_ref = ref_dut_res.connection, dut_ref_res.connection 85 assert ref_dut and dut_ref 86 87 # Get connection handle 88 connection_handle = int.from_bytes(ref_dut.cookie.value, 'big') 89 connection = self.ref.device.lookup_connection(connection_handle) # type: ignore 90 91 # Connect to the SDP Server 92 self.ref.log.info(f'Connecting to SDP Server') 93 sdp_client = SdpClient(connection) # type: ignore 94 await sdp_client.connect() # type: ignore 95 self.ref.log.info(f'Connected to SDP Server') 96 97 # SDP Client disconnect 98 await sdp_client.disconnect() # type: ignore 99 await self.ref.aio.host.Disconnect(connection=ref_dut) 100 101 @avatar.asynchronous 102 async def test_sdp_list_services_and_attributes(self) -> None: 103 # Pandora connection tokens 104 ref_dut, dut_ref = None, None 105 106 # Make classic connection 107 ref_dut_res, dut_ref_res = await asyncio.gather( 108 self.ref.aio.host.Connect(address=self.dut.address), 109 self.dut.aio.host.WaitConnection(address=self.ref.address), 110 ) 111 assert_is_not_none(ref_dut_res.connection) 112 assert_is_not_none(dut_ref_res.connection) 113 ref_dut, dut_ref = ref_dut_res.connection, dut_ref_res.connection 114 assert ref_dut and dut_ref 115 116 # Get connection handle 117 connection_handle = int.from_bytes(ref_dut.cookie.value, 'big') 118 connection = self.ref.device.lookup_connection(connection_handle) # type: ignore 119 120 # Connect to the SDP Server 121 self.ref.log.info(f'Connecting to SDP Server') 122 sdp_client = SdpClient(connection) # type: ignore 123 await sdp_client.connect() # type: ignore 124 self.ref.log.info(f'Connected to SDP Server') 125 126 # List all services in the root browse group 127 self.ref.log.info(f'Search Services') 128 service_record_handles = await sdp_client.search_services([SDP_PUBLIC_BROWSE_ROOT]) # type: ignore 129 assert bool(service_record_handles) # type: ignore 130 print(color('SERVICES:', 'yellow'), service_record_handles) # type: ignore 131 132 # For each service in the root browse group, get all its attributes 133 for service_record_handle in service_record_handles: # type: ignore 134 attributes = await sdp_client.get_attributes( # type: ignore 135 service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE] # type: ignore 136 ) 137 print(color(f'SERVICE {service_record_handle:04X} attributes:', 'yellow')) 138 for attribute in attributes: # type: ignore 139 print(' ', attribute.to_string(with_colors=True)) # type: ignore 140 141 # Sdp client disconnect 142 await sdp_client.disconnect() # type: ignore 143 await self.ref.aio.host.Disconnect(connection=ref_dut) 144 145 @avatar.asynchronous 146 async def test_sdp_search_verify_audio_source_service(self) -> None: 147 # Pandora connection tokens 148 ref_dut, dut_ref = None, None 149 150 # Make classic connection 151 ref_dut_res, dut_ref_res = await asyncio.gather( 152 self.ref.aio.host.Connect(address=self.dut.address), 153 self.dut.aio.host.WaitConnection(address=self.ref.address), 154 ) 155 assert_is_not_none(ref_dut_res.connection) 156 assert_is_not_none(dut_ref_res.connection) 157 ref_dut, dut_ref = ref_dut_res.connection, dut_ref_res.connection 158 assert ref_dut and dut_ref 159 160 # Get connection handle 161 connection_handle = int.from_bytes(ref_dut.cookie.value, 'big') 162 connection = self.ref.device.lookup_connection(connection_handle) # type: ignore 163 164 # Connect to the SDP Server 165 self.ref.log.info(f'Connecting to SDP Server') 166 sdp_client = SdpClient(connection) # type: ignore 167 await sdp_client.connect() # type: ignore 168 self.ref.log.info(f'Connected to SDP Server') 169 170 # List all services in the root browse group 171 self.ref.log.info(f'Search Services') 172 service_record_handles = await sdp_client.search_services([SDP_PUBLIC_BROWSE_ROOT]) # type: ignore 173 assert bool(service_record_handles) # type: ignore 174 175 # Verify Audio Source service is present 176 service_found = False 177 for service_record_handle in service_record_handles: # type: ignore 178 attributes = await sdp_client.get_attributes( # type: ignore 179 service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE] # type: ignore 180 ) 181 for attribute in attributes: # type: ignore 182 if attribute.id == SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID: # type: ignore 183 if ServiceAttribute.is_uuid_in_value(BT_AUDIO_SOURCE_SERVICE, attribute.value): # type: ignore 184 service_found = True 185 self.ref.log.info(f'Service found') 186 assert service_found 187 188 # SDP Client disconnect 189 await sdp_client.disconnect() # type: ignore 190 await self.ref.aio.host.Disconnect(connection=ref_dut) 191 192 193if __name__ == '__main__': 194 logging.basicConfig(level=logging.DEBUG) 195 test_runner.main() # type: ignore 196