1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18
19from blueberry.tests.gd.cert.closable import Closable
20from blueberry.tests.gd.cert.closable import safeClose
21from blueberry.tests.gd.cert.event_stream import EventStream
22from blueberry.tests.gd.cert.truth import assertThat
23from blueberry.facade import common_pb2 as common
24from google.protobuf import empty_pb2 as empty_proto
25
26from blueberry.facade.security.facade_pb2 import AuthenticationRequirements
27from blueberry.facade.security.facade_pb2 import AuthenticationRequirementsMessage
28from blueberry.facade.security.facade_pb2 import SecurityPolicyMessage
29from blueberry.facade.security.facade_pb2 import IoCapabilities
30from blueberry.facade.security.facade_pb2 import IoCapabilityMessage
31from blueberry.facade.security.facade_pb2 import OobDataBondMessage
32from blueberry.facade.security.facade_pb2 import OobDataMessage
33from blueberry.facade.security.facade_pb2 import UiMsgType
34from blueberry.facade.security.facade_pb2 import UiCallbackMsg
35from blueberry.facade.security.facade_pb2 import UiCallbackType
36
37
38class PySecurity(Closable):
39    """
40        Abstraction for security tasks and GRPC calls
41    """
42
43    _io_capabilities_name_lookup = {
44        IoCapabilities.DISPLAY_ONLY: "DISPLAY_ONLY",
45        IoCapabilities.DISPLAY_YES_NO_IO_CAP: "DISPLAY_YES_NO_IO_CAP",
46        IoCapabilities.KEYBOARD_ONLY: "KEYBOARD_ONLY",
47        IoCapabilities.NO_INPUT_NO_OUTPUT: "NO_INPUT_NO_OUTPUT",
48    }
49
50    _auth_reqs_name_lookup = {
51        AuthenticationRequirements.NO_BONDING: "NO_BONDING",
52        AuthenticationRequirements.NO_BONDING_MITM_PROTECTION: "NO_BONDING_MITM_PROTECTION",
53        AuthenticationRequirements.DEDICATED_BONDING: "DEDICATED_BONDING",
54        AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION: "DEDICATED_BONDING_MITM_PROTECTION",
55        AuthenticationRequirements.GENERAL_BONDING: "GENERAL_BONDING",
56        AuthenticationRequirements.GENERAL_BONDING_MITM_PROTECTION: "GENERAL_BONDING_MITM_PROTECTION",
57    }
58
59    _ui_event_stream = None
60    _bond_event_stream = None
61    _oob_data_event_stream = None
62
63    def __init__(self, device):
64        logging.info("DUT: Init")
65        self._device = device
66        self._device.wait_channel_ready()
67        self._ui_event_stream = EventStream(self._device.security.FetchUiEvents(empty_proto.Empty()))
68        self._bond_event_stream = EventStream(self._device.security.FetchBondEvents(empty_proto.Empty()))
69        self._enforce_security_policy_stream = EventStream(
70            self._device.security.FetchEnforceSecurityPolicyEvents(empty_proto.Empty()))
71        self._disconnect_event_stream = EventStream(self._device.security.FetchDisconnectEvents(empty_proto.Empty()))
72        self._oob_data_event_stream = EventStream(
73            self._device.security.FetchGetOutOfBandDataEvents(empty_proto.Empty()))
74
75    def create_bond(self, address, type):
76        """
77            Triggers stack under test to create bond
78        """
79        logging.info("DUT: Creating bond to '%s' from '%s'" % (str(address), str(self._device.address)))
80        self._device.security.CreateBond(
81            common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type))
82
83    def create_bond_out_of_band(self, address, type, p192_oob_data, p256_oob_data):
84        """
85            Triggers stack under test to create bond using Out of Band method
86        """
87
88        logging.info("DUT: Creating OOB bond to '%s' from '%s'" % (str(address), str(self._device.address)))
89
90        self._device.security.CreateBondOutOfBand(
91            OobDataBondMessage(
92                address=common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type),
93                p192_data=OobDataMessage(
94                    address=common.BluetoothAddressWithType(
95                        address=common.BluetoothAddress(address=address), type=type),
96                    confirmation_value=bytes(bytearray(p192_oob_data[0])),
97                    random_value=bytes(bytearray(p192_oob_data[1]))),
98                p256_data=OobDataMessage(
99                    address=common.BluetoothAddressWithType(
100                        address=common.BluetoothAddress(address=address), type=type),
101                    confirmation_value=bytes(bytearray(p256_oob_data[0])),
102                    random_value=bytes(bytearray(p256_oob_data[1])))))
103
104    def remove_bond(self, address, type):
105        """
106            Removes bond from stack under test
107        """
108        self._device.security.RemoveBond(
109            common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type))
110
111    def set_io_capabilities(self, io_capabilities):
112        """
113            Set the IO Capabilities used for the DUT
114        """
115        logging.info("DUT: setting IO Capabilities data to '%s'" % self._io_capabilities_name_lookup.get(
116            io_capabilities, "ERROR"))
117        self._device.security.SetIoCapability(IoCapabilityMessage(capability=io_capabilities))
118
119    def set_authentication_requirements(self, auth_reqs):
120        """
121            Establish authentication requirements for the stack
122        """
123        logging.info("DUT: setting Authentication Requirements data to '%s'" % self._auth_reqs_name_lookup.get(
124            auth_reqs, "ERROR"))
125        self._device.security.SetAuthenticationRequirements(AuthenticationRequirementsMessage(requirement=auth_reqs))
126
127    def __send_ui_callback(self, address, callback_type, b, uid, pin):
128        """
129            Send a callback from the UI as if the user pressed a button on the dialog
130        """
131        logging.info("DUT: Sending user input response uid: %d; response: %s" % (uid, b))
132        self._device.security.SendUiCallback(
133            UiCallbackMsg(
134                message_type=callback_type,
135                boolean=b,
136                unique_id=uid,
137                pin=bytes(pin),
138                address=common.BluetoothAddressWithType(
139                    address=common.BluetoothAddress(address=address),
140                    type=common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS)))
141
142    def enable_secure_simple_pairing(self):
143        """
144            This is called when you want to enable SSP for testing
145            Since the stack under test already enables it by default
146            we do not need to do anything here for the time being
147        """
148        pass
149
150    def enable_secure_connections(self):
151        pass
152
153    def accept_pairing(self, cert_address, reply_boolean):
154        """
155            Here we pass, but in cert we perform pairing flow tasks.
156            This was added here in order to be more dynamic, but the stack
157            under test will handle the pairing flow.
158        """
159        pass
160
161    def accept_oob_pairing(self, cert_address, reply_boolean):
162        """
163            Here we pass, but in cert we perform pairing flow tasks.
164            This was added here in order to be more dynamic, but the stack
165            under test will handle the pairing flow.
166        """
167        pass
168
169    def wait_for_passkey(self, cert_address):
170        """
171            Respond to the UI event
172        """
173        passkey = -1
174
175        def get_passkey(event):
176            if event.message_type == UiMsgType.DISPLAY_PASSKEY:
177                nonlocal passkey
178                passkey = event.numeric_value
179                return True
180            return False
181
182        logging.info("DUT: Waiting for expected UI event")
183        assertThat(self._ui_event_stream).emits(get_passkey)
184        return passkey
185
186    def input_pin(self, cert_address, pin):
187        """
188            Respond to the UI event
189        """
190        logging.info("DUT: Inputting pin code: %s" % str(pin))
191        self.on_user_input(
192            cert_address=cert_address, reply_boolean=True, expected_ui_event=UiMsgType.DISPLAY_PIN_ENTRY, pin=pin)
193
194    def on_user_input(self, cert_address, reply_boolean, expected_ui_event, pin=[]):
195        """
196            Respond to the UI event
197        """
198        if expected_ui_event is None:
199            return
200
201        ui_id = -1
202
203        def get_unique_id(event):
204            if event.message_type == expected_ui_event:
205                nonlocal ui_id
206                ui_id = event.unique_id
207                return True
208            return False
209
210        logging.info("DUT: Waiting for expected UI event")
211        assertThat(self._ui_event_stream).emits(get_unique_id)
212        callback_type = UiCallbackType.YES_NO if len(pin) == 0 else UiCallbackType.PIN
213        self.__send_ui_callback(cert_address, callback_type, reply_boolean, ui_id, pin)
214
215    def get_address(self):
216        return self._device.address
217
218    def wait_for_bond_event(self, expected_bond_event):
219        """
220            A bond event will be triggered once the bond process
221            is complete.  For the DUT we need to wait for it,
222            for Cert it isn't needed.
223        """
224        logging.info("DUT: Waiting for Bond Event: %s " % expected_bond_event)
225        assertThat(self._bond_event_stream).emits(
226            lambda event: event.message_type == expected_bond_event or logging.info("DUT: Actual Bond Event: %s" % event.message_type)
227        )
228
229    def wait_for_enforce_security_event(self, expected_enforce_security_event):
230        """
231            We expect a 'True' or 'False' from the enforce security call
232
233            This interface will allow the caller to wait for a callback
234            result from enforcing security policy over the facade.
235        """
236        logging.info("DUT: Waiting for enforce security event")
237        assertThat(self._enforce_security_policy_stream).emits(
238            lambda event: event.result == expected_enforce_security_event or logging.info(event.result))
239
240    def wait_for_disconnect_event(self):
241        """
242            The Address is expected to be returned
243        """
244        logging.info("DUT: Waiting for Disconnect Event")
245        assertThat(self._disconnect_event_stream).emits(lambda event: logging.info("event: %s" % event.address) or True)
246
247    def enforce_security_policy(self, address, type, policy):
248        """
249            Call to enforce classic security policy
250        """
251        self._device.security.EnforceSecurityPolicy(
252            SecurityPolicyMessage(
253                address=common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type),
254                policy=policy))
255
256    def get_oob_data_from_controller(self, oob_data_present):
257        self._device.security.GetOutOfBandData(empty_proto.Empty())
258        oob_data = []
259
260        def get_oob_data(event):
261            nonlocal oob_data
262            oob_data = [
263                list(event.p192_data.confirmation_value),
264                list(event.p192_data.random_value), [0 for i in range(0, 16)], [0 for i in range(0, 16)]
265            ]
266            return True
267
268        assertThat(self._oob_data_event_stream).emits(get_oob_data)
269        return oob_data
270
271    def close(self):
272        safeClose(self._ui_event_stream)
273        safeClose(self._bond_event_stream)
274        safeClose(self._enforce_security_policy_stream)
275        safeClose(self._disconnect_event_stream)
276        safeClose(self._oob_data_event_stream)
277