1#!/usr/bin/env python3 2# 3# Copyright 2021 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import asyncio 18import grpc 19 20from blueberry.facade.topshim import facade_pb2 21from blueberry.facade.topshim import facade_pb2_grpc 22from blueberry.tests.topshim.lib.async_closable import AsyncClosable 23 24from google.protobuf import empty_pb2 as empty_proto 25 26 27class AdapterClient(AsyncClosable): 28 """ 29 Wrapper gRPC interface to the Topshim/BTIF layer 30 """ 31 # Timeout for async wait 32 DEFAULT_TIMEOUT = 2 33 __task_list = [] 34 __channel = None 35 __adapter_stub = None 36 __adapter_event_stream = None 37 38 def __init__(self, port=8999): 39 self.__channel = grpc.aio.insecure_channel("localhost:%d" % port) 40 self.__adapter_stub = facade_pb2_grpc.AdapterServiceStub(self.__channel) 41 self.__adapter_event_stream = self.__adapter_stub.FetchEvents(facade_pb2.FetchEventsRequest()) 42 43 async def close(self): 44 for task in self.__task_list: 45 if task.done() or task.cancelled(): 46 continue 47 task.cancel() 48 self.__task_list.clear() 49 await self.__channel.close() 50 51 async def __get_next_event(self, event, future): 52 """Get the future of next event from the stream""" 53 while True: 54 e = await self.__adapter_event_stream.read() 55 56 # Match event by some condition. 57 if e.event_type == event: 58 future.set_result(e.params) 59 break 60 else: 61 print("Got '%s'; expecting '%s'" % (e.event_type, event)) 62 print(e) 63 64 async def _listen_for_event(self, event): 65 """Start fetching events""" 66 future = asyncio.get_running_loop().create_future() 67 task = asyncio.get_running_loop().create_task(self.__get_next_event(event, future)) 68 self.__task_list.append(task) 69 try: 70 await asyncio.wait_for(future, AdapterClient.DEFAULT_TIMEOUT) 71 except: 72 task.cancel() 73 print("Failed to get event", event) 74 return future 75 76 async def _verify_adapter_started(self): 77 future = await self._listen_for_event(facade_pb2.EventType.ADAPTER_STATE) 78 params = future.result() 79 return params["state"].data[0] == "ON" 80 81 async def toggle_stack(self, is_start=True): 82 """Enable/disable the stack""" 83 await self.__adapter_stub.ToggleStack(facade_pb2.ToggleStackRequest(start_stack=is_start)) 84 return await self._verify_adapter_started() 85 86 async def enable_inquiry_scan(self): 87 """Enable inquiry scan (Required to make device connectable and discoverable by other devices)""" 88 await self.__adapter_stub.SetDiscoveryMode(facade_pb2.SetDiscoveryModeRequest(enable_inquiry_scan=True)) 89 return await self._listen_for_event(facade_pb2.EventType.ADAPTER_PROPERTY) 90 91 async def enable_page_scan(self): 92 """Enable page scan (might be used for A2dp sink to be discoverable)""" 93 await self.__adapter_stub.SetDiscoveryMode(facade_pb2.SetDiscoveryModeRequest(enable_page_scan=True)) 94 return await self._listen_for_event(facade_pb2.EventType.ADAPTER_PROPERTY) 95 96 async def disable_page_scan(self): 97 """Enable page scan (might be used for A2dp sink to be discoverable)""" 98 await self.__adapter_stub.SetDiscoveryMode(facade_pb2.SetDiscoveryModeRequest(enable_page_scan=False)) 99 return await self._listen_for_event(facade_pb2.EventType.ADAPTER_PROPERTY) 100 101 async def clear_event_filter(self): 102 await self.__adapter_stub.ClearEventFilter(empty_proto.Empty()) 103 104 async def clear_event_mask(self): 105 await self.__adapter_stub.ClearEventMask(empty_proto.Empty()) 106 107 async def clear_filter_accept_list(self): 108 await self.__adapter_stub.ClearFilterAcceptList(empty_proto.Empty()) 109 110 async def disconnect_all_acls(self): 111 await self.__adapter_stub.DisconnectAllAcls(empty_proto.Empty()) 112 113 async def le_rand(self): 114 await self.__adapter_stub.LeRand(empty_proto.Empty()) 115 future = await self._listen_for_event(facade_pb2.EventType.LE_RAND) 116 params = future.result() 117 return params["data"].data[0] 118 119 async def restore_filter_accept_list(self): 120 await self.__adapter_stub.RestoreFilterAcceptList(empty_proto.Empty()) 121 122 async def set_default_event_mask_except(self, mask, le_mask): 123 await self.__adapter_stub.SetDefaultEventMaskExcept( 124 facade_pb2.SetDefaultEventMaskExceptRequest(mask=mask, le_mask=le_mask)) 125 126 async def set_event_filter_inquiry_result_all_devices(self): 127 await self.__adapter_stub.SetEventFilterInquiryResultAllDevices(empty_proto.Empty()) 128 129 async def set_event_filter_connection_setup_all_devices(self): 130 await self.__adapter_stub.SetEventFilterConnectionSetupAllDevices(empty_proto.Empty()) 131 132 async def allow_wake_by_hid(self): 133 await self.__adapter_stub.AllowWakeByHid(empty_proto.Empty()) 134 135 async def set_local_io_caps(self, io_capability): 136 await self.__adapter_stub.SetLocalIoCaps(facade_pb2.SetLocalIoCapsRequest(io_capability=io_capability)) 137 return await self._listen_for_event(facade_pb2.EventType.ADAPTER_PROPERTY) 138 139 async def toggle_discovery(self, is_start): 140 await self.__adapter_stub.ToggleDiscovery(facade_pb2.ToggleDiscoveryRequest(is_start=is_start)) 141 future = await self._listen_for_event(facade_pb2.EventType.DISCOVERY_STATE) 142 return future 143 144 async def find_device(self): 145 return await self._listen_for_event(facade_pb2.EventType.DEVICE_FOUND) 146 147 148class A2dpAutomationHelper(): 149 """Invoke gRPC on topshim for A2DP testing""" 150 151 def __init__(self, port=8999): 152 self.__channel = grpc.insecure_channel("localhost:%d" % port) 153 self.media_stub = facade_pb2_grpc.MediaServiceStub(self.__channel) 154 155 """Start A2dp source profile service""" 156 157 def start_source(self): 158 self.media_stub.StartA2dp(facade_pb2.StartA2dpRequest(start_a2dp_source=True)) 159 160 """Start A2dp sink profile service""" 161 162 def start_sink(self): 163 self.media_stub.StartA2dp(facade_pb2.StartA2dpRequest(start_a2dp_sink=True)) 164 165 """Initialize an A2dp connection from source to sink""" 166 167 def source_connect_to_remote(self, address="11:22:33:44:55:66"): 168 self.media_stub.A2dpSourceConnect(facade_pb2.A2dpSourceConnectRequest(address=address)) 169