1#!/usr/bin/env python3 2# 3# Copyright 2022 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16import asyncio 17import grpc 18 19from blueberry.facade.topshim import facade_pb2 20from blueberry.facade.topshim import facade_pb2_grpc 21from blueberry.tests.topshim.lib.async_closable import AsyncClosable 22 23from google.protobuf import empty_pb2 as empty_proto 24 25 26class HfpClient(AsyncClosable): 27 """ 28 Wrapper gRPC interface to the HFP Service 29 """ 30 # Timeout for async wait 31 DEFAULT_TIMEOUT = 2 32 __task_list = [] 33 __channel = None 34 __hfp_stub = None 35 __hfp_event_stream = None 36 37 def __init__(self, port=8999): 38 self.__channel = grpc.aio.insecure_channel("localhost:%d" % port) 39 self.__hfp_stub = facade_pb2_grpc.HfpServiceStub(self.__channel) 40 self.__hfp_event_stream = self.__hfp_stub.FetchEvents(facade_pb2.FetchEventsRequest()) 41 42 async def close(self): 43 """ 44 Terminate the current tasks. 45 """ 46 for task in self.__task_list: 47 task.cancel() 48 task = None 49 self.__task_list.clear() 50 await self.__channel.close() 51 52 async def start_slc(self, address): 53 """ 54 """ 55 await self.__hfp_stub.StartSlc( 56 facade_pb2.StartSlcRequest(connection=facade_pb2.Connection(cookie=address.encode()))) 57 return await self._listen_for_event(facade_pb2.EventType.HFP_CONNECTION_STATE) 58 59 async def stop_slc(self, address): 60 """ 61 """ 62 await self.__hfp_stub.StopSlc( 63 facade_pb2.StopSlcRequest(connection=facade_pb2.Connection(cookie=address.encode()))) 64 return await self._listen_for_event(facade_pb2.EventType.HFP_CONNECTION_STATE) 65 66 async def connect_audio(self, address, is_sco_offload_enabled=False, disabled_codecs=0): 67 """ 68 """ 69 await self.__hfp_stub.ConnectAudio( 70 facade_pb2.ConnectAudioRequest(connection=facade_pb2.Connection(cookie=address.encode()), 71 is_sco_offload_enabled=is_sco_offload_enabled, 72 disabled_codecs=disabled_codecs)) 73 74 async def disconnect_audio(self, address): 75 """ 76 """ 77 await self.__hfp_stub.DisconnectAudio( 78 facade_pb2.DisconnectAudioRequest(connection=facade_pb2.Connection(cookie=address.encode()))) 79 80 async def set_volume(self, address, volume): 81 """ 82 """ 83 await self.__hfp_stub.DisconnectAudio( 84 facade_pb2.DisconnectAudioRequest(connection=facade_pb2.Connection(cookie=address.encode()), volume=volume)) 85 86 async def wait_for_hfp_connection_state_change(self): 87 return await self._listen_for_event(facade_pb2.EventType.HFP_CONNECTION_STATE) 88 89 async def __get_next_event(self, event, future): 90 """Get the future of next event from the stream""" 91 while True: 92 e = await self.__hfp_event_stream.read() 93 94 # Match event by some condition. 95 if e.event_type == event: 96 future.set_result(e.data) 97 break 98 else: 99 print("Got '%s'; expecting '%s'" % (e.event_type, event)) 100 print(e) 101 102 async def _listen_for_event(self, event): 103 """Start fetching events""" 104 future = asyncio.get_running_loop().create_future() 105 self.__task_list.append(asyncio.get_running_loop().create_task(self.__get_next_event(event, future))) 106 try: 107 await asyncio.wait_for(future, HfpClient.DEFAULT_TIMEOUT) 108 except: 109 print("Failed to get event", event) 110 return future 111