1#!/usr/bin/env python3
2#
3# Copyright (C) 2019 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""This is the PTS base class that is inherited from all PTS
17Tests.
18"""
19
20import re
21import time
22import traceback
23
24from ctypes import *
25
26from acts import signals
27from acts.base_test import BaseTestClass
28from acts.controllers.bluetooth_pts_device import VERDICT_STRINGS
29from acts.controllers.fuchsia_device import FuchsiaDevice
30from acts.signals import TestSignal
31from acts_contrib.test_utils.abstract_devices.bluetooth_device import create_bluetooth_device
32from acts_contrib.test_utils.bt.bt_constants import gatt_transport
33from acts_contrib.test_utils.fuchsia.bt_test_utils import le_scan_for_device_by_name
34
35
36class PtsBaseClass(BaseTestClass):
37    """ Class for representing common functionality across all PTS tests.
38
39    This includes the ability to rerun tests due to PTS instability,
40    common PTS action mappings, and setup/teardown related devices.
41
42    """
43    scan_timeout_seconds = 10
44    peer_identifier = None
45
46    def setup_class(self):
47        super().setup_class()
48        if 'dut' in self.user_params:
49            if self.user_params['dut'] == 'fuchsia_devices':
50                self.dut = create_bluetooth_device(self.fuchsia_devices[0])
51            elif self.user_params['dut'] == 'android_devices':
52                self.dut = create_bluetooth_device(self.android_devices[0])
53            else:
54                raise ValueError('Invalid DUT specified in config. (%s)' %
55                                 self.user_params['dut'])
56        else:
57            # Default is an fuchsia device
58            self.dut = create_bluetooth_device(self.fuchsia_devices[0])
59
60        self.characteristic_read_not_permitted_uuid = self.user_params.get(
61            "characteristic_read_not_permitted_uuid")
62        self.characteristic_read_not_permitted_handle = self.user_params.get(
63            "characteristic_read_not_permitted_handle")
64        self.characteristic_read_invalid_handle = self.user_params.get(
65            "characteristic_read_invalid_handle")
66        self.characteristic_attribute_not_found_uuid = self.user_params.get(
67            "characteristic_attribute_not_found_uuid")
68        self.write_characteristic_not_permitted_handle = self.user_params.get(
69            "write_characteristic_not_permitted_handle")
70
71        self.pts = self.bluetooth_pts_device[0]
72        # MMI functions commented out until implemented. Added for tracking
73        # purposes.
74        self.pts_action_mapping = {
75            "A2DP": {
76                1: self.a2dp_mmi_iut_connectable,
77                1002: self.a2dp_mmi_iut_accept_connect,
78                1020: self.a2dp_mmi_initiate_open_stream,
79            },
80            "GATT": {
81                1: self.mmi_make_iut_connectable,
82                2: self.mmi_iut_initiate_connection,
83                3: self.mmi_iut_initiate_disconnection,
84                # 4: self.mmi_iut_no_security,
85                # 5: self.mmi_iut_initiate_br_connection,
86                10: self.mmi_discover_primary_service,
87                # 11: self.mmi_confirm_no_primary_service_small,
88                # 12: self.mmi_iut_mtu_exchange,
89                # 13: self.mmi_discover_all_service_record,
90                # 14: self.mmi_iut_discover_gatt_service_record,
91                15: self.mmi_iut_find_included_services,
92                # 16: self.mmi_confirm_no_characteristic_uuid_small,
93                17: self.mmi_confirm_primary_service,
94                # 18: self.mmi_send_primary_service_uuid,
95                # 19: self.mmi_confirm_primary_service_uuid,
96                # 22: self.confirm_primary_service_1801,
97                24: self.mmi_confirm_include_service,
98                26: self.mmi_confirm_characteristic_service,
99                # 27: self.perform_read_all_characteristics,
100                29: self.
101                mmi_discover_service_uuid_range,  # AKA: discover service by uuid
102                # 31: self.perform_read_all_descriptors,
103                48: self.mmi_iut_send_read_characteristic_handle,
104                58: self.mmi_iut_send_read_descriptor_handle,
105                70: self.mmi_send_write_command,
106                74: self.mmi_send_write_request,
107                76: self.mmi_send_prepare_write,
108                77: self.mmi_iut_send_prepare_write_greater_offset,
109                80: self.mmi_iut_send_prepare_write_greater,
110                110: self.mmi_iut_enter_handle_read_not_permitted,
111                111: self.mmi_iut_enter_uuid_read_not_permitted,
112                118: self.mmi_iut_enter_handle_invalid,
113                119: self.mmi_iut_enter_uuid_attribute_not_found,
114                120: self.mmi_iut_enter_handle_write_not_permitted,
115                2000: self.mmi_verify_secure_id,  # Enter pairing pin from DUT.
116            },
117            "SDP": {
118                # TODO: Implement MMIs as necessary
119            }
120        }
121        self.pts.bind_to(self.process_next_action)
122
123    def teardown_class(self):
124        self.pts.clean_up()
125
126    def setup_test(self):
127        # Always start the test with RESULT_INCOMP
128        self.pts.pts_test_result = VERDICT_STRINGS['RESULT_INCOMP']
129
130    def teardown_test(self):
131        return True
132
133    @staticmethod
134    def pts_test_wrap(fn):
135        def _safe_wrap_test_case(self, *args, **kwargs):
136            test_id = "{}:{}:{}".format(self.__class__.__name__, fn.__name__,
137                                        time.time())
138            log_string = "[Test ID] {}".format(test_id)
139            self.log.info(log_string)
140            try:
141                self.dut.log_info("Started " + log_string)
142                result = fn(self, *args, **kwargs)
143                self.dut.log_info("Finished " + log_string)
144                rerun_count = self.user_params.get("pts_auto_rerun_count", 0)
145                for i in range(int(rerun_count)):
146                    if result is not True:
147                        self.teardown_test()
148                        log_string = "[Rerun Test ID] {}. Run #{} run failed... Retrying".format(
149                            test_id, i + 1)
150                        self.log.info(log_string)
151                        self.setup_test()
152                        self.dut.log_info("Rerun Started " + log_string)
153                        result = fn(self, *args, **kwargs)
154                    else:
155                        return result
156                return result
157            except TestSignal:
158                raise
159            except Exception as e:
160                self.log.error(traceback.format_exc())
161                self.log.error(str(e))
162                raise
163            return fn(self, *args, **kwargs)
164
165        return _safe_wrap_test_case
166
167    def process_next_action(self, action):
168        func = self.pts_action_mapping.get(
169            self.pts.pts_profile_mmi_request).get(action, "Nothing")
170        if func != 'Nothing':
171            func()
172
173    ### BEGIN A2DP MMI Actions ###
174
175    def a2dp_mmi_iut_connectable(self):
176        self.dut.start_profile_a2dp_sink()
177        self.dut.set_discoverable(True)
178
179    def a2dp_mmi_iut_accept_connect(self):
180        self.dut.start_profile_a2dp_sink()
181        self.dut.set_discoverable(True)
182
183    def a2dp_mmi_initiate_open_stream(self):
184        self.dut.a2dp_initiate_open_stream()
185
186    ### END A2DP MMI Actions ###
187
188    ### BEGIN GATT MMI Actions ###
189
190    def create_write_value_by_size(self, size):
191        write_value = []
192        for i in range(size):
193            write_value.append(i % 256)
194        return write_value
195
196    def mmi_send_write_command(self):
197        description_to_parse = self.pts.current_implicit_send_description
198        raw_handle = re.search('handle = \'(.*)\'O with', description_to_parse)
199        handle = int(raw_handle.group(1), 16)
200        raw_size = re.search('with <= \'(.*)\' byte', description_to_parse)
201        size = int(raw_size.group(1))
202        self.dut.gatt_client_write_characteristic_without_response_by_handle(
203            self.peer_identifier, handle,
204            self.create_write_value_by_size(size))
205
206    def mmi_send_write_request(self):
207        description_to_parse = self.pts.current_implicit_send_description
208        raw_handle = re.search('handle = \'(.*)\'O with', description_to_parse)
209        handle = int(raw_handle.group(1), 16)
210        raw_size = re.search('with <= \'(.*)\' byte', description_to_parse)
211        size = int(raw_size.group(1))
212        offset = 0
213        self.dut.gatt_client_write_characteristic_by_handle(
214            self.peer_identifier, handle, offset,
215            self.create_write_value_by_size(size))
216
217    def mmi_send_prepare_write(self):
218        description_to_parse = self.pts.current_implicit_send_description
219        raw_handle = re.search('handle = \'(.*)\'O <=', description_to_parse)
220        handle = int(raw_handle.group(1), 16)
221        raw_size = re.search('<= \'(.*)\' byte', description_to_parse)
222        size = int(math.floor(int(raw_size.group(1)) / 2))
223        offset = int(size / 2)
224        self.dut.gatt_client_write_characteristic_by_handle(
225            self.peer_identifier, handle, offset,
226            self.create_write_value_by_size(size))
227
228    def mmi_iut_send_prepare_write_greater_offset(self):
229        description_to_parse = self.pts.current_implicit_send_description
230        raw_handle = re.search('handle = \'(.*)\'O and', description_to_parse)
231        handle = int(raw_handle.group(1), 16)
232        raw_offset = re.search('greater than \'(.*)\' byte',
233                               description_to_parse)
234        offset = int(raw_offset.group(1))
235        size = 1
236        self.dut.gatt_client_write_characteristic_by_handle(
237            self.peer_identifier, handle, offset,
238            self.create_write_value_by_size(size))
239
240    def mmi_iut_send_prepare_write_greater(self):
241        description_to_parse = self.pts.current_implicit_send_description
242        raw_handle = re.search('handle = \'(.*)\'O with', description_to_parse)
243        handle = int(raw_handle.group(1), 16)
244        raw_size = re.search('greater than \'(.*)\' byte',
245                             description_to_parse)
246        size = int(raw_size.group(1))
247        offset = 0
248        self.dut.gatt_client_write_characteristic_by_handle(
249            self.peer_identifier, handle, offset,
250            self.create_write_value_by_size(size))
251
252    def mmi_make_iut_connectable(self):
253        adv_data = {
254            "name": fuchsia_name,
255            "appearance": None,
256            "service_data": None,
257            "tx_power_level": None,
258            "service_uuids": None,
259            "manufacturer_data": None,
260            "uris": None,
261        }
262        scan_response = None
263        connectable = True
264        interval = 1000
265
266        self.dut.start_le_advertisement(adv_data, scan_response, interval,
267                                        connectable)
268
269    def mmi_iut_enter_uuid_read_not_permitted(self):
270        self.pts.extra_answers.append(
271            self.characteristic_read_not_permitted_uuid)
272
273    def mmi_iut_enter_handle_read_not_permitted(self):
274        self.pts.extra_answers.append(
275            self.characteristic_read_not_permitted_handle)
276
277    def mmi_iut_enter_handle_invalid(self):
278        self.pts.extra_answers.append(self.characteristic_read_invalid_handle)
279
280    def mmi_iut_enter_uuid_attribute_not_found(self):
281        self.pts.extra_answers.append(
282            self.characteristic_attribute_not_found_uuid)
283
284    def mmi_iut_enter_handle_write_not_permitted(self):
285        self.pts.extra_answers.append(
286            self.write_characteristic_not_permitted_handle)
287
288    def mmi_verify_secure_id(self):
289        self.pts.extra_answers.append(self.dut.get_pairing_pin())
290
291    def mmi_discover_service_uuid_range(self, uuid):
292        self.dut.gatt_client_mmi_discover_service_uuid_range(
293            self.peer_identifier, uuid)
294
295    def mmi_iut_initiate_connection(self):
296        autoconnect = False
297        transport = gatt_transport['le']
298        adv_name = "PTS"
299        self.peer_identifier = self.dut.le_scan_with_name_filter(
300            "PTS", self.scan_timeout_seconds)
301        if self.peer_identifier is None:
302            raise signals.TestFailure("Scanner unable to find advertisement.")
303        tries = 3
304        for _ in range(tries):
305            if self.dut.gatt_connect(self.peer_identifier, transport,
306                                     autoconnect):
307                return
308
309        raise signals.TestFailure("Unable to connect to peripheral.")
310
311    def mmi_iut_initiate_disconnection(self):
312        if not self.dut.gatt_disconnect(self.peer_identifier):
313            raise signals.TestFailure("Failed to disconnect from peer.")
314
315    def mmi_discover_primary_service(self):
316        self.dut.gatt_refresh()
317
318    def mmi_iut_find_included_services(self):
319        self.dut.gatt_refresh()
320
321        test_result = self.pts.execute_test(test_name)
322        return test_result
323
324    def mmi_confirm_primary_service(self):
325        # TODO: Write verifier that 1800 and 1801 exists. For now just pass.
326        return True
327
328    def mmi_confirm_characteristic_service(self):
329        # TODO: Write verifier that no services exist. For now just pass.
330        return True
331
332    def mmi_confirm_include_service(self, uuid_description):
333        # TODO: Write verifier that input services exist. For now just pass.
334        # Note: List comes in the form of a long string to parse:
335        # Attribute Handle = '0002'O Included Service Attribute handle = '0080'O,End Group Handle = '0085'O,Service UUID = 'A00B'O
336        # \n
337        # Attribute Handle = '0021'O Included Service Attribute handle = '0001'O,End Group Handle = '0006'O,Service UUID = 'A00D'O
338        # \n ...
339        return True
340
341    def mmi_iut_send_read_characteristic_handle(self):
342        description_to_parse = self.pts.current_implicit_send_description
343        raw_handle = re.search('handle = \'(.*)\'O to', description_to_parse)
344        handle = int(raw_handle.group(1), 16)
345        self.dut.gatt_client_read_characteristic_by_handle(
346            self.peer_identifier, handle)
347
348    def mmi_iut_send_read_descriptor_handle(self):
349        description_to_parse = self.pts.current_implicit_send_description
350        raw_handle = re.search('handle = \'(.*)\'O to', description_to_parse)
351        handle = int(raw_handle.group(1), 16)
352        self.dut.gatt_client_descriptor_read_by_handle(self.peer_identifier,
353                                                       handle)
354
355    ### END GATT MMI Actions ###
356