1# Copyright 2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import asyncio
16import avatar
17import logging
18
19from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices
20from bumble.colors import color
21from bumble.core import BT_AUDIO_SOURCE_SERVICE
22from bumble.sdp import (
23    SDP_ALL_ATTRIBUTES_RANGE,
24    SDP_PUBLIC_BROWSE_ROOT,
25    SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
26    Client as SdpClient,
27    ServiceAttribute,
28)
29from mobly import base_test, test_runner
30from mobly.asserts import assert_equal  # type: ignore
31from mobly.asserts import assert_in  # type: ignore
32from mobly.asserts import assert_is_none  # type: ignore
33from mobly.asserts import assert_is_not_none  # type: ignore
34from mobly.asserts import fail  # type: ignore
35from typing import Optional
36
37
38class SdpTest(base_test.BaseTestClass):  # type: ignore[misc]
39    '''
40    This class aim to test SDP on Classic Bluetooth devices.
41    '''
42
43    devices: Optional[PandoraDevices] = None
44
45    # pandora devices.
46    dut: PandoraDevice
47    ref: PandoraDevice
48
49    @avatar.asynchronous
50    async def setup_class(self) -> None:
51        self.devices = PandoraDevices(self)
52        self.dut, self.ref, *_ = self.devices
53
54        # Enable BR/EDR mode and SSP for Bumble devices.
55        for device in self.devices:
56            if isinstance(device, BumblePandoraDevice):
57                device.config.setdefault('classic_enabled', True)
58                device.config.setdefault('classic_ssp_enabled', True)
59                device.config.setdefault(
60                    'server',
61                    {
62                        'io_capability': 'display_output_and_yes_no_input',
63                    },
64                )
65
66        await asyncio.gather(self.dut.reset(), self.ref.reset())
67
68    def teardown_class(self) -> None:
69        if self.devices:
70            self.devices.stop_all()
71
72    @avatar.asynchronous
73    async def test_sdp_connect_and_disconnect(self) -> None:
74        # Pandora connection tokens
75        ref_dut, dut_ref = None, None
76
77        # Make classic connection
78        ref_dut_res, dut_ref_res = await asyncio.gather(
79            self.ref.aio.host.Connect(address=self.dut.address),
80            self.dut.aio.host.WaitConnection(address=self.ref.address),
81        )
82        assert_is_not_none(ref_dut_res.connection)
83        assert_is_not_none(dut_ref_res.connection)
84        ref_dut, dut_ref = ref_dut_res.connection, dut_ref_res.connection
85        assert ref_dut and dut_ref
86
87        # Get connection handle
88        connection_handle = int.from_bytes(ref_dut.cookie.value, 'big')
89        connection = self.ref.device.lookup_connection(connection_handle)  # type: ignore
90
91        # Connect to the SDP Server
92        self.ref.log.info(f'Connecting to SDP Server')
93        sdp_client = SdpClient(connection)  # type: ignore
94        await sdp_client.connect()  # type: ignore
95        self.ref.log.info(f'Connected to SDP Server')
96
97        # SDP Client disconnect
98        await sdp_client.disconnect()  # type: ignore
99        await self.ref.aio.host.Disconnect(connection=ref_dut)
100
101    @avatar.asynchronous
102    async def test_sdp_list_services_and_attributes(self) -> None:
103        # Pandora connection tokens
104        ref_dut, dut_ref = None, None
105
106        # Make classic connection
107        ref_dut_res, dut_ref_res = await asyncio.gather(
108            self.ref.aio.host.Connect(address=self.dut.address),
109            self.dut.aio.host.WaitConnection(address=self.ref.address),
110        )
111        assert_is_not_none(ref_dut_res.connection)
112        assert_is_not_none(dut_ref_res.connection)
113        ref_dut, dut_ref = ref_dut_res.connection, dut_ref_res.connection
114        assert ref_dut and dut_ref
115
116        # Get connection handle
117        connection_handle = int.from_bytes(ref_dut.cookie.value, 'big')
118        connection = self.ref.device.lookup_connection(connection_handle)  # type: ignore
119
120        # Connect to the SDP Server
121        self.ref.log.info(f'Connecting to SDP Server')
122        sdp_client = SdpClient(connection)  # type: ignore
123        await sdp_client.connect()  # type: ignore
124        self.ref.log.info(f'Connected to SDP Server')
125
126        # List all services in the root browse group
127        self.ref.log.info(f'Search Services')
128        service_record_handles = await sdp_client.search_services([SDP_PUBLIC_BROWSE_ROOT])  # type: ignore
129        assert bool(service_record_handles)  # type: ignore
130        print(color('SERVICES:', 'yellow'), service_record_handles)  # type: ignore
131
132        # For each service in the root browse group, get all its attributes
133        for service_record_handle in service_record_handles:  # type: ignore
134            attributes = await sdp_client.get_attributes(  # type: ignore
135                service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE]  # type: ignore
136            )
137            print(color(f'SERVICE {service_record_handle:04X} attributes:', 'yellow'))
138            for attribute in attributes:  # type: ignore
139                print('  ', attribute.to_string(with_colors=True))  # type: ignore
140
141        # Sdp client disconnect
142        await sdp_client.disconnect()  # type: ignore
143        await self.ref.aio.host.Disconnect(connection=ref_dut)
144
145    @avatar.asynchronous
146    async def test_sdp_search_verify_audio_source_service(self) -> None:
147        # Pandora connection tokens
148        ref_dut, dut_ref = None, None
149
150        # Make classic connection
151        ref_dut_res, dut_ref_res = await asyncio.gather(
152            self.ref.aio.host.Connect(address=self.dut.address),
153            self.dut.aio.host.WaitConnection(address=self.ref.address),
154        )
155        assert_is_not_none(ref_dut_res.connection)
156        assert_is_not_none(dut_ref_res.connection)
157        ref_dut, dut_ref = ref_dut_res.connection, dut_ref_res.connection
158        assert ref_dut and dut_ref
159
160        # Get connection handle
161        connection_handle = int.from_bytes(ref_dut.cookie.value, 'big')
162        connection = self.ref.device.lookup_connection(connection_handle)  # type: ignore
163
164        # Connect to the SDP Server
165        self.ref.log.info(f'Connecting to SDP Server')
166        sdp_client = SdpClient(connection)  # type: ignore
167        await sdp_client.connect()  # type: ignore
168        self.ref.log.info(f'Connected to SDP Server')
169
170        # List all services in the root browse group
171        self.ref.log.info(f'Search Services')
172        service_record_handles = await sdp_client.search_services([SDP_PUBLIC_BROWSE_ROOT])  # type: ignore
173        assert bool(service_record_handles)  # type: ignore
174
175        # Verify Audio Source service is present
176        service_found = False
177        for service_record_handle in service_record_handles:  # type: ignore
178            attributes = await sdp_client.get_attributes(  # type: ignore
179                service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE]  # type: ignore
180            )
181            for attribute in attributes:  # type: ignore
182                if attribute.id == SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID:  # type: ignore
183                    if ServiceAttribute.is_uuid_in_value(BT_AUDIO_SOURCE_SERVICE, attribute.value):  # type: ignore
184                        service_found = True
185                        self.ref.log.info(f'Service found')
186        assert service_found
187
188        # SDP Client disconnect
189        await sdp_client.disconnect()  # type: ignore
190        await self.ref.aio.host.Disconnect(connection=ref_dut)
191
192
193if __name__ == '__main__':
194    logging.basicConfig(level=logging.DEBUG)
195    test_runner.main()  # type: ignore
196