1#!/usr/bin/env python3 2# 3# Copyright (C) 2019 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16"""This is the PTS base class that is inherited from all PTS 17Tests. 18""" 19 20import re 21import time 22import traceback 23 24from ctypes import * 25 26from acts import signals 27from acts.base_test import BaseTestClass 28from acts.controllers.bluetooth_pts_device import VERDICT_STRINGS 29from acts.controllers.fuchsia_device import FuchsiaDevice 30from acts.signals import TestSignal 31from acts_contrib.test_utils.abstract_devices.bluetooth_device import create_bluetooth_device 32from acts_contrib.test_utils.bt.bt_constants import gatt_transport 33from acts_contrib.test_utils.fuchsia.bt_test_utils import le_scan_for_device_by_name 34 35 36class PtsBaseClass(BaseTestClass): 37 """ Class for representing common functionality across all PTS tests. 38 39 This includes the ability to rerun tests due to PTS instability, 40 common PTS action mappings, and setup/teardown related devices. 41 42 """ 43 scan_timeout_seconds = 10 44 peer_identifier = None 45 46 def setup_class(self): 47 super().setup_class() 48 if 'dut' in self.user_params: 49 if self.user_params['dut'] == 'fuchsia_devices': 50 self.dut = create_bluetooth_device(self.fuchsia_devices[0]) 51 elif self.user_params['dut'] == 'android_devices': 52 self.dut = create_bluetooth_device(self.android_devices[0]) 53 else: 54 raise ValueError('Invalid DUT specified in config. (%s)' % 55 self.user_params['dut']) 56 else: 57 # Default is an fuchsia device 58 self.dut = create_bluetooth_device(self.fuchsia_devices[0]) 59 60 self.characteristic_read_not_permitted_uuid = self.user_params.get( 61 "characteristic_read_not_permitted_uuid") 62 self.characteristic_read_not_permitted_handle = self.user_params.get( 63 "characteristic_read_not_permitted_handle") 64 self.characteristic_read_invalid_handle = self.user_params.get( 65 "characteristic_read_invalid_handle") 66 self.characteristic_attribute_not_found_uuid = self.user_params.get( 67 "characteristic_attribute_not_found_uuid") 68 self.write_characteristic_not_permitted_handle = self.user_params.get( 69 "write_characteristic_not_permitted_handle") 70 71 self.pts = self.bluetooth_pts_device[0] 72 # MMI functions commented out until implemented. Added for tracking 73 # purposes. 74 self.pts_action_mapping = { 75 "A2DP": { 76 1: self.a2dp_mmi_iut_connectable, 77 1002: self.a2dp_mmi_iut_accept_connect, 78 1020: self.a2dp_mmi_initiate_open_stream, 79 }, 80 "GATT": { 81 1: self.mmi_make_iut_connectable, 82 2: self.mmi_iut_initiate_connection, 83 3: self.mmi_iut_initiate_disconnection, 84 # 4: self.mmi_iut_no_security, 85 # 5: self.mmi_iut_initiate_br_connection, 86 10: self.mmi_discover_primary_service, 87 # 11: self.mmi_confirm_no_primary_service_small, 88 # 12: self.mmi_iut_mtu_exchange, 89 # 13: self.mmi_discover_all_service_record, 90 # 14: self.mmi_iut_discover_gatt_service_record, 91 15: self.mmi_iut_find_included_services, 92 # 16: self.mmi_confirm_no_characteristic_uuid_small, 93 17: self.mmi_confirm_primary_service, 94 # 18: self.mmi_send_primary_service_uuid, 95 # 19: self.mmi_confirm_primary_service_uuid, 96 # 22: self.confirm_primary_service_1801, 97 24: self.mmi_confirm_include_service, 98 26: self.mmi_confirm_characteristic_service, 99 # 27: self.perform_read_all_characteristics, 100 29: self. 101 mmi_discover_service_uuid_range, # AKA: discover service by uuid 102 # 31: self.perform_read_all_descriptors, 103 48: self.mmi_iut_send_read_characteristic_handle, 104 58: self.mmi_iut_send_read_descriptor_handle, 105 70: self.mmi_send_write_command, 106 74: self.mmi_send_write_request, 107 76: self.mmi_send_prepare_write, 108 77: self.mmi_iut_send_prepare_write_greater_offset, 109 80: self.mmi_iut_send_prepare_write_greater, 110 110: self.mmi_iut_enter_handle_read_not_permitted, 111 111: self.mmi_iut_enter_uuid_read_not_permitted, 112 118: self.mmi_iut_enter_handle_invalid, 113 119: self.mmi_iut_enter_uuid_attribute_not_found, 114 120: self.mmi_iut_enter_handle_write_not_permitted, 115 2000: self.mmi_verify_secure_id, # Enter pairing pin from DUT. 116 }, 117 "SDP": { 118 # TODO: Implement MMIs as necessary 119 } 120 } 121 self.pts.bind_to(self.process_next_action) 122 123 def teardown_class(self): 124 self.pts.clean_up() 125 126 def setup_test(self): 127 # Always start the test with RESULT_INCOMP 128 self.pts.pts_test_result = VERDICT_STRINGS['RESULT_INCOMP'] 129 130 def teardown_test(self): 131 return True 132 133 @staticmethod 134 def pts_test_wrap(fn): 135 def _safe_wrap_test_case(self, *args, **kwargs): 136 test_id = "{}:{}:{}".format(self.__class__.__name__, fn.__name__, 137 time.time()) 138 log_string = "[Test ID] {}".format(test_id) 139 self.log.info(log_string) 140 try: 141 self.dut.log_info("Started " + log_string) 142 result = fn(self, *args, **kwargs) 143 self.dut.log_info("Finished " + log_string) 144 rerun_count = self.user_params.get("pts_auto_rerun_count", 0) 145 for i in range(int(rerun_count)): 146 if result is not True: 147 self.teardown_test() 148 log_string = "[Rerun Test ID] {}. Run #{} run failed... Retrying".format( 149 test_id, i + 1) 150 self.log.info(log_string) 151 self.setup_test() 152 self.dut.log_info("Rerun Started " + log_string) 153 result = fn(self, *args, **kwargs) 154 else: 155 return result 156 return result 157 except TestSignal: 158 raise 159 except Exception as e: 160 self.log.error(traceback.format_exc()) 161 self.log.error(str(e)) 162 raise 163 return fn(self, *args, **kwargs) 164 165 return _safe_wrap_test_case 166 167 def process_next_action(self, action): 168 func = self.pts_action_mapping.get( 169 self.pts.pts_profile_mmi_request).get(action, "Nothing") 170 if func != 'Nothing': 171 func() 172 173 ### BEGIN A2DP MMI Actions ### 174 175 def a2dp_mmi_iut_connectable(self): 176 self.dut.start_profile_a2dp_sink() 177 self.dut.set_discoverable(True) 178 179 def a2dp_mmi_iut_accept_connect(self): 180 self.dut.start_profile_a2dp_sink() 181 self.dut.set_discoverable(True) 182 183 def a2dp_mmi_initiate_open_stream(self): 184 self.dut.a2dp_initiate_open_stream() 185 186 ### END A2DP MMI Actions ### 187 188 ### BEGIN GATT MMI Actions ### 189 190 def create_write_value_by_size(self, size): 191 write_value = [] 192 for i in range(size): 193 write_value.append(i % 256) 194 return write_value 195 196 def mmi_send_write_command(self): 197 description_to_parse = self.pts.current_implicit_send_description 198 raw_handle = re.search('handle = \'(.*)\'O with', description_to_parse) 199 handle = int(raw_handle.group(1), 16) 200 raw_size = re.search('with <= \'(.*)\' byte', description_to_parse) 201 size = int(raw_size.group(1)) 202 self.dut.gatt_client_write_characteristic_without_response_by_handle( 203 self.peer_identifier, handle, 204 self.create_write_value_by_size(size)) 205 206 def mmi_send_write_request(self): 207 description_to_parse = self.pts.current_implicit_send_description 208 raw_handle = re.search('handle = \'(.*)\'O with', description_to_parse) 209 handle = int(raw_handle.group(1), 16) 210 raw_size = re.search('with <= \'(.*)\' byte', description_to_parse) 211 size = int(raw_size.group(1)) 212 offset = 0 213 self.dut.gatt_client_write_characteristic_by_handle( 214 self.peer_identifier, handle, offset, 215 self.create_write_value_by_size(size)) 216 217 def mmi_send_prepare_write(self): 218 description_to_parse = self.pts.current_implicit_send_description 219 raw_handle = re.search('handle = \'(.*)\'O <=', description_to_parse) 220 handle = int(raw_handle.group(1), 16) 221 raw_size = re.search('<= \'(.*)\' byte', description_to_parse) 222 size = int(math.floor(int(raw_size.group(1)) / 2)) 223 offset = int(size / 2) 224 self.dut.gatt_client_write_characteristic_by_handle( 225 self.peer_identifier, handle, offset, 226 self.create_write_value_by_size(size)) 227 228 def mmi_iut_send_prepare_write_greater_offset(self): 229 description_to_parse = self.pts.current_implicit_send_description 230 raw_handle = re.search('handle = \'(.*)\'O and', description_to_parse) 231 handle = int(raw_handle.group(1), 16) 232 raw_offset = re.search('greater than \'(.*)\' byte', 233 description_to_parse) 234 offset = int(raw_offset.group(1)) 235 size = 1 236 self.dut.gatt_client_write_characteristic_by_handle( 237 self.peer_identifier, handle, offset, 238 self.create_write_value_by_size(size)) 239 240 def mmi_iut_send_prepare_write_greater(self): 241 description_to_parse = self.pts.current_implicit_send_description 242 raw_handle = re.search('handle = \'(.*)\'O with', description_to_parse) 243 handle = int(raw_handle.group(1), 16) 244 raw_size = re.search('greater than \'(.*)\' byte', 245 description_to_parse) 246 size = int(raw_size.group(1)) 247 offset = 0 248 self.dut.gatt_client_write_characteristic_by_handle( 249 self.peer_identifier, handle, offset, 250 self.create_write_value_by_size(size)) 251 252 def mmi_make_iut_connectable(self): 253 adv_data = { 254 "name": fuchsia_name, 255 "appearance": None, 256 "service_data": None, 257 "tx_power_level": None, 258 "service_uuids": None, 259 "manufacturer_data": None, 260 "uris": None, 261 } 262 scan_response = None 263 connectable = True 264 interval = 1000 265 266 self.dut.start_le_advertisement(adv_data, scan_response, interval, 267 connectable) 268 269 def mmi_iut_enter_uuid_read_not_permitted(self): 270 self.pts.extra_answers.append( 271 self.characteristic_read_not_permitted_uuid) 272 273 def mmi_iut_enter_handle_read_not_permitted(self): 274 self.pts.extra_answers.append( 275 self.characteristic_read_not_permitted_handle) 276 277 def mmi_iut_enter_handle_invalid(self): 278 self.pts.extra_answers.append(self.characteristic_read_invalid_handle) 279 280 def mmi_iut_enter_uuid_attribute_not_found(self): 281 self.pts.extra_answers.append( 282 self.characteristic_attribute_not_found_uuid) 283 284 def mmi_iut_enter_handle_write_not_permitted(self): 285 self.pts.extra_answers.append( 286 self.write_characteristic_not_permitted_handle) 287 288 def mmi_verify_secure_id(self): 289 self.pts.extra_answers.append(self.dut.get_pairing_pin()) 290 291 def mmi_discover_service_uuid_range(self, uuid): 292 self.dut.gatt_client_mmi_discover_service_uuid_range( 293 self.peer_identifier, uuid) 294 295 def mmi_iut_initiate_connection(self): 296 autoconnect = False 297 transport = gatt_transport['le'] 298 adv_name = "PTS" 299 self.peer_identifier = self.dut.le_scan_with_name_filter( 300 "PTS", self.scan_timeout_seconds) 301 if self.peer_identifier is None: 302 raise signals.TestFailure("Scanner unable to find advertisement.") 303 tries = 3 304 for _ in range(tries): 305 if self.dut.gatt_connect(self.peer_identifier, transport, 306 autoconnect): 307 return 308 309 raise signals.TestFailure("Unable to connect to peripheral.") 310 311 def mmi_iut_initiate_disconnection(self): 312 if not self.dut.gatt_disconnect(self.peer_identifier): 313 raise signals.TestFailure("Failed to disconnect from peer.") 314 315 def mmi_discover_primary_service(self): 316 self.dut.gatt_refresh() 317 318 def mmi_iut_find_included_services(self): 319 self.dut.gatt_refresh() 320 321 test_result = self.pts.execute_test(test_name) 322 return test_result 323 324 def mmi_confirm_primary_service(self): 325 # TODO: Write verifier that 1800 and 1801 exists. For now just pass. 326 return True 327 328 def mmi_confirm_characteristic_service(self): 329 # TODO: Write verifier that no services exist. For now just pass. 330 return True 331 332 def mmi_confirm_include_service(self, uuid_description): 333 # TODO: Write verifier that input services exist. For now just pass. 334 # Note: List comes in the form of a long string to parse: 335 # Attribute Handle = '0002'O Included Service Attribute handle = '0080'O,End Group Handle = '0085'O,Service UUID = 'A00B'O 336 # \n 337 # Attribute Handle = '0021'O Included Service Attribute handle = '0001'O,End Group Handle = '0006'O,Service UUID = 'A00D'O 338 # \n ... 339 return True 340 341 def mmi_iut_send_read_characteristic_handle(self): 342 description_to_parse = self.pts.current_implicit_send_description 343 raw_handle = re.search('handle = \'(.*)\'O to', description_to_parse) 344 handle = int(raw_handle.group(1), 16) 345 self.dut.gatt_client_read_characteristic_by_handle( 346 self.peer_identifier, handle) 347 348 def mmi_iut_send_read_descriptor_handle(self): 349 description_to_parse = self.pts.current_implicit_send_description 350 raw_handle = re.search('handle = \'(.*)\'O to', description_to_parse) 351 handle = int(raw_handle.group(1), 16) 352 self.dut.gatt_client_descriptor_read_by_handle(self.peer_identifier, 353 handle) 354 355 ### END GATT MMI Actions ### 356