1#!/usr/bin/env python3
2#
3#   Copyright 2022 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18import queue
19
20from blueberry.tests.gd.cert.truth import assertThat
21
22
23class L2cap:
24
25    __l2cap_connection_timeout = 10  #seconds
26    __device = None
27    __active_client_coc = False
28    __active_server_coc = False
29
30    def __init__(self, device):
31        self.__device = device
32
33    def __wait_for_event(self, expected_event_name):
34        try:
35            event_info = self.__device.ed.pop_event(expected_event_name, self.__l2cap_connection_timeout)
36            logging.info(event_info)
37        except queue.Empty as error:
38            logging.error("Failed to find event: %s", expected_event_name)
39            return False
40        return True
41
42    def create_l2cap_le_coc(self, address, psm, secure):
43        logging.info("creating l2cap channel with secure=%r and psm %s", secure, psm)
44        self.__device.sl4a.bluetoothSocketConnBeginConnectThreadPsm(address, True, psm, secure)
45        assertThat(self.__wait_for_event("BluetoothSocketConnectSuccess")).isTrue()
46        self.__active_client_coc = True
47
48    # Starts listening on the l2cap server socket, returns the psm
49    def listen_using_l2cap_le_coc(self, secure):
50        logging.info("Listening for l2cap channel with secure=%r", secure)
51        self.__device.sl4a.bluetoothSocketConnBeginAcceptThreadPsm(self.__l2cap_connection_timeout, True, secure)
52        self.__active_server_coc = True
53        return self.__device.sl4a.bluetoothSocketConnGetPsm()
54
55    def close_l2cap_le_coc_client(self):
56        if self.__active_client_coc:
57            logging.info("Closing LE L2CAP CoC Client")
58            self.__device.sl4a.bluetoothSocketConnKillConnThread()
59            self.__active_client_coc = False
60
61    def close_l2cap_le_coc_server(self):
62        if self.__active_server_coc:
63            logging.info("Closing LE L2CAP CoC Server")
64            self.__device.sl4a.bluetoothSocketConnEndAcceptThread()
65            self.__active_server_coc = False
66
67    def close(self):
68        self.close_l2cap_le_coc_client()
69        self.close_l2cap_le_coc_server()
70        self.__device == None
71