1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from acts.controllers.fuchsia_lib.base_lib import BaseLib
18
19
20class FuchsiaAvdtpLib(BaseLib):
21
22    def __init__(self, addr: str) -> None:
23        super().__init__(addr, "avdtp")
24
25    def init(self, initiator_delay=None):
26        """Initializes the AVDTP service with optional initiator_delay.
27
28        Args:
29            initiator_delay: Optional. The delay in milliseconds to start a
30            stream.
31
32        Returns:
33            Dictionary, None if success, error if error.
34        """
35        test_cmd = "avdtp_facade.AvdtpInit"
36        test_args = {"initiator_delay": initiator_delay}
37
38        return self.send_command(test_cmd, test_args)
39
40    def getConnectedPeers(self):
41        """Gets the AVDTP connected peers.
42
43        Returns:
44            Dictionary, None if success, error if error.
45        """
46        test_cmd = "avdtp_facade.AvdtpGetConnectedPeers"
47        test_args = {}
48
49        return self.send_command(test_cmd, test_args)
50
51    def setConfiguration(self, peer_id):
52        """Sends the AVDTP command to input peer_id: set configuration
53
54        Args:
55            peer_id: The peer id to send the AVDTP command to.
56
57        Returns:
58            Dictionary, None if success, error if error.
59        """
60        test_cmd = "avdtp_facade.AvdtpSetConfiguration"
61        test_args = {"identifier": peer_id}
62
63        return self.send_command(test_cmd, test_args)
64
65    def getConfiguration(self, peer_id):
66        """Sends the AVDTP command to input peer_id: get configuration
67
68        Args:
69            peer_id: The peer id to send the AVDTP command to.
70
71        Returns:
72            Dictionary, None if success, error if error.
73        """
74        test_cmd = "avdtp_facade.AvdtpGetConfiguration"
75        test_args = {"identifier": peer_id}
76
77        return self.send_command(test_cmd, test_args)
78
79    def getCapabilities(self, peer_id):
80        """Sends the AVDTP command to input peer_id: get capabilities
81
82        Args:
83            peer_id: The peer id to send the AVDTP command to.
84
85        Returns:
86            Dictionary, None if success, error if error.
87        """
88        test_cmd = "avdtp_facade.AvdtpGetCapabilities"
89        test_args = {"identifier": peer_id}
90
91        return self.send_command(test_cmd, test_args)
92
93    def getAllCapabilities(self, peer_id):
94        """Sends the AVDTP command to input peer_id: get all capabilities
95
96        Args:
97            peer_id: The peer id to send the AVDTP command to.
98
99        Returns:
100            Dictionary, None if success, error if error.
101        """
102        test_cmd = "avdtp_facade.AvdtpGetAllCapabilities"
103        test_args = {"identifier": peer_id}
104
105        return self.send_command(test_cmd, test_args)
106
107    def reconfigureStream(self, peer_id):
108        """Sends the AVDTP command to input peer_id: reconfigure stream
109
110        Args:
111            peer_id: The peer id to send the AVDTP command to.
112
113        Returns:
114            Dictionary, None if success, error if error.
115        """
116        test_cmd = "avdtp_facade.AvdtpReconfigureStream"
117        test_args = {"identifier": peer_id}
118
119        return self.send_command(test_cmd, test_args)
120
121    def suspendStream(self, peer_id):
122        """Sends the AVDTP command to input peer_id: suspend stream
123        Args:
124            peer_id: The peer id to send the AVDTP command to.
125
126        Returns:
127            Dictionary, None if success, error if error.
128        """
129        test_cmd = "avdtp_facade.AvdtpSuspendStream"
130        test_args = {"identifier": peer_id}
131
132        return self.send_command(test_cmd, test_args)
133
134    def suspendAndReconfigure(self, peer_id):
135        """Sends the AVDTP command to input peer_id: suspend and reconfigure
136
137        Args:
138            peer_id: The peer id to send the AVDTP command to.
139
140        Returns:
141            Dictionary, None if success, error if error.
142        """
143        test_cmd = "avdtp_facade.AvdtpSuspendAndReconfigure"
144        test_args = {"identifier": peer_id}
145
146        return self.send_command(test_cmd, test_args)
147
148    def releaseStream(self, peer_id):
149        """Sends the AVDTP command to input peer_id: release stream
150
151        Args:
152            peer_id: The peer id to send the AVDTP command to.
153
154        Returns:
155            Dictionary, None if success, error if error.
156        """
157        test_cmd = "avdtp_facade.AvdtpReleaseStream"
158        test_args = {"identifier": peer_id}
159
160        return self.send_command(test_cmd, test_args)
161
162    def establishStream(self, peer_id):
163        """Sends the AVDTP command to input peer_id: establish stream
164
165        Args:
166            peer_id: The peer id to send the AVDTP command to.
167
168        Returns:
169            Dictionary, None if success, error if error.
170        """
171        test_cmd = "avdtp_facade.AvdtpEstablishStream"
172        test_args = {"identifier": peer_id}
173
174        return self.send_command(test_cmd, test_args)
175
176    def startStream(self, peer_id):
177        """Sends the AVDTP command to input peer_id: start stream
178
179        Args:
180            peer_id: The peer id to send the AVDTP command to.
181
182        Returns:
183            Dictionary, None if success, error if error.
184        """
185        test_cmd = "avdtp_facade.AvdtpStartStream"
186        test_args = {"identifier": peer_id}
187
188        return self.send_command(test_cmd, test_args)
189
190    def abortStream(self, peer_id):
191        """Sends the AVDTP command to input peer_id: abort stream
192
193        Args:
194            peer_id: The peer id to send the AVDTP command to.
195
196        Returns:
197            Dictionary, None if success, error if error.
198        """
199        test_cmd = "avdtp_facade.AvdtpAbortStream"
200        test_args = {"identifier": peer_id}
201        test_id = self.build_id(self.test_counter)
202        self.test_counter += 1
203
204        return self.send_command(test_cmd, test_args)
205
206    def establishStream(self, peer_id):
207        """Sends the AVDTP command to input peer_id: establish stream
208
209        Args:
210            peer_id: The peer id to send the AVDTP command to.
211
212        Returns:
213            Dictionary, None if success, error if error.
214        """
215        test_cmd = "avdtp_facade.AvdtpEstablishStream"
216        test_args = {"identifier": peer_id}
217
218        return self.send_command(test_cmd, test_args)
219
220    def removeService(self):
221        """Removes the AVDTP service from the Fuchsia device
222
223        Returns:
224            Dictionary, None if success, error if error.
225        """
226        test_cmd = "avdtp_facade.AvdtpRemoveService"
227        test_args = {}
228
229        return self.send_command(test_cmd, test_args)
230