1#!/usr/bin/env python3 2# 3# Copyright 2021 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from acts.controllers.fuchsia_lib.base_lib import BaseLib 18 19 20class FuchsiaRfcommLib(BaseLib): 21 22 def __init__(self, addr: str) -> None: 23 super().__init__(addr, "rfcomm") 24 25 def init(self): 26 """Initializes the RFCOMM service. 27 28 Returns: 29 Dictionary, None if success, error if error. 30 """ 31 test_cmd = "rfcomm_facade.RfcommInit" 32 test_args = {} 33 34 return self.send_command(test_cmd, test_args) 35 36 def removeService(self): 37 """Removes the RFCOMM service from the Fuchsia device 38 39 Returns: 40 Dictionary, None if success, error if error. 41 """ 42 test_cmd = "rfcomm_facade.RfcommRemoveService" 43 test_args = {} 44 45 return self.send_command(test_cmd, test_args) 46 47 def disconnectSession(self, peer_id): 48 """Closes the RFCOMM Session with the remote peer 49 50 Returns: 51 Dictionary, None if success, error if error. 52 """ 53 test_cmd = "rfcomm_facade.DisconnectSession" 54 test_args = {"peer_id": peer_id} 55 56 return self.send_command(test_cmd, test_args) 57 58 def connectRfcommChannel(self, peer_id, server_channel_number): 59 """Makes an outgoing RFCOMM connection to the remote peer 60 61 Returns: 62 Dictionary, None if success, error if error. 63 """ 64 test_cmd = "rfcomm_facade.ConnectRfcommChannel" 65 test_args = { 66 "peer_id": peer_id, 67 "server_channel_number": server_channel_number 68 } 69 70 return self.send_command(test_cmd, test_args) 71 72 def disconnectRfcommChannel(self, peer_id, server_channel_number): 73 """Closes the RFCOMM channel with the remote peer 74 75 Returns: 76 Dictionary, None if success, error if error. 77 """ 78 test_cmd = "rfcomm_facade.DisconnectRfcommChannel" 79 test_args = { 80 "peer_id": peer_id, 81 "server_channel_number": server_channel_number 82 } 83 84 return self.send_command(test_cmd, test_args) 85 86 def sendRemoteLineStatus(self, peer_id, server_channel_number): 87 """Sends a Remote Line Status update to the remote peer for the provided channel number 88 89 Returns: 90 Dictionary, None if success, error if error. 91 """ 92 test_cmd = "rfcomm_facade.SendRemoteLineStatus" 93 test_args = { 94 "peer_id": peer_id, 95 "server_channel_number": server_channel_number 96 } 97 98 return self.send_command(test_cmd, test_args) 99 100 def writeRfcomm(self, peer_id, server_channel_number, data): 101 """Sends data to the remote peer over the RFCOMM channel 102 103 Returns: 104 Dictionary, None if success, error if error. 105 """ 106 test_cmd = "rfcomm_facade.RfcommWrite" 107 test_args = { 108 "peer_id": peer_id, 109 "server_channel_number": server_channel_number, 110 "data": data 111 } 112 113 return self.send_command(test_cmd, test_args) 114