1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18 19from blueberry.tests.gd.cert.closable import Closable 20from blueberry.tests.gd.cert.closable import safeClose 21from blueberry.tests.gd.cert.event_stream import EventStream 22from blueberry.tests.gd.cert.truth import assertThat 23from blueberry.facade import common_pb2 as common 24from google.protobuf import empty_pb2 as empty_proto 25 26from blueberry.facade.security.facade_pb2 import AuthenticationRequirements 27from blueberry.facade.security.facade_pb2 import AuthenticationRequirementsMessage 28from blueberry.facade.security.facade_pb2 import SecurityPolicyMessage 29from blueberry.facade.security.facade_pb2 import IoCapabilities 30from blueberry.facade.security.facade_pb2 import IoCapabilityMessage 31from blueberry.facade.security.facade_pb2 import OobDataBondMessage 32from blueberry.facade.security.facade_pb2 import OobDataMessage 33from blueberry.facade.security.facade_pb2 import UiMsgType 34from blueberry.facade.security.facade_pb2 import UiCallbackMsg 35from blueberry.facade.security.facade_pb2 import UiCallbackType 36 37 38class PySecurity(Closable): 39 """ 40 Abstraction for security tasks and GRPC calls 41 """ 42 43 _io_capabilities_name_lookup = { 44 IoCapabilities.DISPLAY_ONLY: "DISPLAY_ONLY", 45 IoCapabilities.DISPLAY_YES_NO_IO_CAP: "DISPLAY_YES_NO_IO_CAP", 46 IoCapabilities.KEYBOARD_ONLY: "KEYBOARD_ONLY", 47 IoCapabilities.NO_INPUT_NO_OUTPUT: "NO_INPUT_NO_OUTPUT", 48 } 49 50 _auth_reqs_name_lookup = { 51 AuthenticationRequirements.NO_BONDING: "NO_BONDING", 52 AuthenticationRequirements.NO_BONDING_MITM_PROTECTION: "NO_BONDING_MITM_PROTECTION", 53 AuthenticationRequirements.DEDICATED_BONDING: "DEDICATED_BONDING", 54 AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION: "DEDICATED_BONDING_MITM_PROTECTION", 55 AuthenticationRequirements.GENERAL_BONDING: "GENERAL_BONDING", 56 AuthenticationRequirements.GENERAL_BONDING_MITM_PROTECTION: "GENERAL_BONDING_MITM_PROTECTION", 57 } 58 59 _ui_event_stream = None 60 _bond_event_stream = None 61 _oob_data_event_stream = None 62 63 def __init__(self, device): 64 logging.info("DUT: Init") 65 self._device = device 66 self._device.wait_channel_ready() 67 self._ui_event_stream = EventStream(self._device.security.FetchUiEvents(empty_proto.Empty())) 68 self._bond_event_stream = EventStream(self._device.security.FetchBondEvents(empty_proto.Empty())) 69 self._enforce_security_policy_stream = EventStream( 70 self._device.security.FetchEnforceSecurityPolicyEvents(empty_proto.Empty())) 71 self._disconnect_event_stream = EventStream(self._device.security.FetchDisconnectEvents(empty_proto.Empty())) 72 self._oob_data_event_stream = EventStream( 73 self._device.security.FetchGetOutOfBandDataEvents(empty_proto.Empty())) 74 75 def create_bond(self, address, type): 76 """ 77 Triggers stack under test to create bond 78 """ 79 logging.info("DUT: Creating bond to '%s' from '%s'" % (str(address), str(self._device.address))) 80 self._device.security.CreateBond( 81 common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type)) 82 83 def create_bond_out_of_band(self, address, type, p192_oob_data, p256_oob_data): 84 """ 85 Triggers stack under test to create bond using Out of Band method 86 """ 87 88 logging.info("DUT: Creating OOB bond to '%s' from '%s'" % (str(address), str(self._device.address))) 89 90 self._device.security.CreateBondOutOfBand( 91 OobDataBondMessage( 92 address=common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type), 93 p192_data=OobDataMessage( 94 address=common.BluetoothAddressWithType( 95 address=common.BluetoothAddress(address=address), type=type), 96 confirmation_value=bytes(bytearray(p192_oob_data[0])), 97 random_value=bytes(bytearray(p192_oob_data[1]))), 98 p256_data=OobDataMessage( 99 address=common.BluetoothAddressWithType( 100 address=common.BluetoothAddress(address=address), type=type), 101 confirmation_value=bytes(bytearray(p256_oob_data[0])), 102 random_value=bytes(bytearray(p256_oob_data[1]))))) 103 104 def remove_bond(self, address, type): 105 """ 106 Removes bond from stack under test 107 """ 108 self._device.security.RemoveBond( 109 common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type)) 110 111 def set_io_capabilities(self, io_capabilities): 112 """ 113 Set the IO Capabilities used for the DUT 114 """ 115 logging.info("DUT: setting IO Capabilities data to '%s'" % self._io_capabilities_name_lookup.get( 116 io_capabilities, "ERROR")) 117 self._device.security.SetIoCapability(IoCapabilityMessage(capability=io_capabilities)) 118 119 def set_authentication_requirements(self, auth_reqs): 120 """ 121 Establish authentication requirements for the stack 122 """ 123 logging.info("DUT: setting Authentication Requirements data to '%s'" % self._auth_reqs_name_lookup.get( 124 auth_reqs, "ERROR")) 125 self._device.security.SetAuthenticationRequirements(AuthenticationRequirementsMessage(requirement=auth_reqs)) 126 127 def __send_ui_callback(self, address, callback_type, b, uid, pin): 128 """ 129 Send a callback from the UI as if the user pressed a button on the dialog 130 """ 131 logging.info("DUT: Sending user input response uid: %d; response: %s" % (uid, b)) 132 self._device.security.SendUiCallback( 133 UiCallbackMsg( 134 message_type=callback_type, 135 boolean=b, 136 unique_id=uid, 137 pin=bytes(pin), 138 address=common.BluetoothAddressWithType( 139 address=common.BluetoothAddress(address=address), 140 type=common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS))) 141 142 def enable_secure_simple_pairing(self): 143 """ 144 This is called when you want to enable SSP for testing 145 Since the stack under test already enables it by default 146 we do not need to do anything here for the time being 147 """ 148 pass 149 150 def enable_secure_connections(self): 151 pass 152 153 def accept_pairing(self, cert_address, reply_boolean): 154 """ 155 Here we pass, but in cert we perform pairing flow tasks. 156 This was added here in order to be more dynamic, but the stack 157 under test will handle the pairing flow. 158 """ 159 pass 160 161 def accept_oob_pairing(self, cert_address, reply_boolean): 162 """ 163 Here we pass, but in cert we perform pairing flow tasks. 164 This was added here in order to be more dynamic, but the stack 165 under test will handle the pairing flow. 166 """ 167 pass 168 169 def wait_for_passkey(self, cert_address): 170 """ 171 Respond to the UI event 172 """ 173 passkey = -1 174 175 def get_passkey(event): 176 if event.message_type == UiMsgType.DISPLAY_PASSKEY: 177 nonlocal passkey 178 passkey = event.numeric_value 179 return True 180 return False 181 182 logging.info("DUT: Waiting for expected UI event") 183 assertThat(self._ui_event_stream).emits(get_passkey) 184 return passkey 185 186 def input_pin(self, cert_address, pin): 187 """ 188 Respond to the UI event 189 """ 190 logging.info("DUT: Inputting pin code: %s" % str(pin)) 191 self.on_user_input( 192 cert_address=cert_address, reply_boolean=True, expected_ui_event=UiMsgType.DISPLAY_PIN_ENTRY, pin=pin) 193 194 def on_user_input(self, cert_address, reply_boolean, expected_ui_event, pin=[]): 195 """ 196 Respond to the UI event 197 """ 198 if expected_ui_event is None: 199 return 200 201 ui_id = -1 202 203 def get_unique_id(event): 204 if event.message_type == expected_ui_event: 205 nonlocal ui_id 206 ui_id = event.unique_id 207 return True 208 return False 209 210 logging.info("DUT: Waiting for expected UI event") 211 assertThat(self._ui_event_stream).emits(get_unique_id) 212 callback_type = UiCallbackType.YES_NO if len(pin) == 0 else UiCallbackType.PIN 213 self.__send_ui_callback(cert_address, callback_type, reply_boolean, ui_id, pin) 214 215 def get_address(self): 216 return self._device.address 217 218 def wait_for_bond_event(self, expected_bond_event): 219 """ 220 A bond event will be triggered once the bond process 221 is complete. For the DUT we need to wait for it, 222 for Cert it isn't needed. 223 """ 224 logging.info("DUT: Waiting for Bond Event: %s " % expected_bond_event) 225 assertThat(self._bond_event_stream).emits( 226 lambda event: event.message_type == expected_bond_event or logging.info("DUT: Actual Bond Event: %s" % event.message_type) 227 ) 228 229 def wait_for_enforce_security_event(self, expected_enforce_security_event): 230 """ 231 We expect a 'True' or 'False' from the enforce security call 232 233 This interface will allow the caller to wait for a callback 234 result from enforcing security policy over the facade. 235 """ 236 logging.info("DUT: Waiting for enforce security event") 237 assertThat(self._enforce_security_policy_stream).emits( 238 lambda event: event.result == expected_enforce_security_event or logging.info(event.result)) 239 240 def wait_for_disconnect_event(self): 241 """ 242 The Address is expected to be returned 243 """ 244 logging.info("DUT: Waiting for Disconnect Event") 245 assertThat(self._disconnect_event_stream).emits(lambda event: logging.info("event: %s" % event.address) or True) 246 247 def enforce_security_policy(self, address, type, policy): 248 """ 249 Call to enforce classic security policy 250 """ 251 self._device.security.EnforceSecurityPolicy( 252 SecurityPolicyMessage( 253 address=common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type), 254 policy=policy)) 255 256 def get_oob_data_from_controller(self, oob_data_present): 257 self._device.security.GetOutOfBandData(empty_proto.Empty()) 258 oob_data = [] 259 260 def get_oob_data(event): 261 nonlocal oob_data 262 oob_data = [ 263 list(event.p192_data.confirmation_value), 264 list(event.p192_data.random_value), [0 for i in range(0, 16)], [0 for i in range(0, 16)] 265 ] 266 return True 267 268 assertThat(self._oob_data_event_stream).emits(get_oob_data) 269 return oob_data 270 271 def close(self): 272 safeClose(self._ui_event_stream) 273 safeClose(self._bond_event_stream) 274 safeClose(self._enforce_security_policy_stream) 275 safeClose(self._disconnect_event_stream) 276 safeClose(self._oob_data_event_stream) 277