1#!/usr/bin/env python3
2#
3#   Copyright 2021 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from acts.controllers.fuchsia_lib.base_lib import BaseLib
18
19
20class FuchsiaRfcommLib(BaseLib):
21
22    def __init__(self, addr: str) -> None:
23        super().__init__(addr, "rfcomm")
24
25    def init(self):
26        """Initializes the RFCOMM service.
27
28        Returns:
29            Dictionary, None if success, error if error.
30        """
31        test_cmd = "rfcomm_facade.RfcommInit"
32        test_args = {}
33
34        return self.send_command(test_cmd, test_args)
35
36    def removeService(self):
37        """Removes the RFCOMM service from the Fuchsia device
38
39        Returns:
40            Dictionary, None if success, error if error.
41        """
42        test_cmd = "rfcomm_facade.RfcommRemoveService"
43        test_args = {}
44
45        return self.send_command(test_cmd, test_args)
46
47    def disconnectSession(self, peer_id):
48        """Closes the RFCOMM Session with the remote peer
49
50        Returns:
51            Dictionary, None if success, error if error.
52        """
53        test_cmd = "rfcomm_facade.DisconnectSession"
54        test_args = {"peer_id": peer_id}
55
56        return self.send_command(test_cmd, test_args)
57
58    def connectRfcommChannel(self, peer_id, server_channel_number):
59        """Makes an outgoing RFCOMM connection to the remote peer
60
61        Returns:
62            Dictionary, None if success, error if error.
63        """
64        test_cmd = "rfcomm_facade.ConnectRfcommChannel"
65        test_args = {
66            "peer_id": peer_id,
67            "server_channel_number": server_channel_number
68        }
69
70        return self.send_command(test_cmd, test_args)
71
72    def disconnectRfcommChannel(self, peer_id, server_channel_number):
73        """Closes the RFCOMM channel with the remote peer
74
75        Returns:
76            Dictionary, None if success, error if error.
77        """
78        test_cmd = "rfcomm_facade.DisconnectRfcommChannel"
79        test_args = {
80            "peer_id": peer_id,
81            "server_channel_number": server_channel_number
82        }
83
84        return self.send_command(test_cmd, test_args)
85
86    def sendRemoteLineStatus(self, peer_id, server_channel_number):
87        """Sends a Remote Line Status update to the remote peer for the provided channel number
88
89        Returns:
90            Dictionary, None if success, error if error.
91        """
92        test_cmd = "rfcomm_facade.SendRemoteLineStatus"
93        test_args = {
94            "peer_id": peer_id,
95            "server_channel_number": server_channel_number
96        }
97
98        return self.send_command(test_cmd, test_args)
99
100    def writeRfcomm(self, peer_id, server_channel_number, data):
101        """Sends data to the remote peer over the RFCOMM channel
102
103        Returns:
104            Dictionary, None if success, error if error.
105        """
106        test_cmd = "rfcomm_facade.RfcommWrite"
107        test_args = {
108            "peer_id": peer_id,
109            "server_channel_number": server_channel_number,
110            "data": data
111        }
112
113        return self.send_command(test_cmd, test_args)
114