1#!/usr/bin/env python3 2# 3# Copyright 2022 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18import queue 19 20from blueberry.tests.gd.cert.truth import assertThat 21 22 23class L2cap: 24 25 __l2cap_connection_timeout = 10 #seconds 26 __device = None 27 __active_client_coc = False 28 __active_server_coc = False 29 30 def __init__(self, device): 31 self.__device = device 32 33 def __wait_for_event(self, expected_event_name): 34 try: 35 event_info = self.__device.ed.pop_event(expected_event_name, self.__l2cap_connection_timeout) 36 logging.info(event_info) 37 except queue.Empty as error: 38 logging.error("Failed to find event: %s", expected_event_name) 39 return False 40 return True 41 42 def create_l2cap_le_coc(self, address, psm, secure): 43 logging.info("creating l2cap channel with secure=%r and psm %s", secure, psm) 44 self.__device.sl4a.bluetoothSocketConnBeginConnectThreadPsm(address, True, psm, secure) 45 assertThat(self.__wait_for_event("BluetoothSocketConnectSuccess")).isTrue() 46 self.__active_client_coc = True 47 48 # Starts listening on the l2cap server socket, returns the psm 49 def listen_using_l2cap_le_coc(self, secure): 50 logging.info("Listening for l2cap channel with secure=%r", secure) 51 self.__device.sl4a.bluetoothSocketConnBeginAcceptThreadPsm(self.__l2cap_connection_timeout, True, secure) 52 self.__active_server_coc = True 53 return self.__device.sl4a.bluetoothSocketConnGetPsm() 54 55 def close_l2cap_le_coc_client(self): 56 if self.__active_client_coc: 57 logging.info("Closing LE L2CAP CoC Client") 58 self.__device.sl4a.bluetoothSocketConnKillConnThread() 59 self.__active_client_coc = False 60 61 def close_l2cap_le_coc_server(self): 62 if self.__active_server_coc: 63 logging.info("Closing LE L2CAP CoC Server") 64 self.__device.sl4a.bluetoothSocketConnEndAcceptThread() 65 self.__active_server_coc = False 66 67 def close(self): 68 self.close_l2cap_le_coc_client() 69 self.close_l2cap_le_coc_server() 70 self.__device == None 71