1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from acts.controllers.fuchsia_lib.base_lib import BaseLib 18 19 20class FuchsiaAvdtpLib(BaseLib): 21 22 def __init__(self, addr: str) -> None: 23 super().__init__(addr, "avdtp") 24 25 def init(self, initiator_delay=None): 26 """Initializes the AVDTP service with optional initiator_delay. 27 28 Args: 29 initiator_delay: Optional. The delay in milliseconds to start a 30 stream. 31 32 Returns: 33 Dictionary, None if success, error if error. 34 """ 35 test_cmd = "avdtp_facade.AvdtpInit" 36 test_args = {"initiator_delay": initiator_delay} 37 38 return self.send_command(test_cmd, test_args) 39 40 def getConnectedPeers(self): 41 """Gets the AVDTP connected peers. 42 43 Returns: 44 Dictionary, None if success, error if error. 45 """ 46 test_cmd = "avdtp_facade.AvdtpGetConnectedPeers" 47 test_args = {} 48 49 return self.send_command(test_cmd, test_args) 50 51 def setConfiguration(self, peer_id): 52 """Sends the AVDTP command to input peer_id: set configuration 53 54 Args: 55 peer_id: The peer id to send the AVDTP command to. 56 57 Returns: 58 Dictionary, None if success, error if error. 59 """ 60 test_cmd = "avdtp_facade.AvdtpSetConfiguration" 61 test_args = {"identifier": peer_id} 62 63 return self.send_command(test_cmd, test_args) 64 65 def getConfiguration(self, peer_id): 66 """Sends the AVDTP command to input peer_id: get configuration 67 68 Args: 69 peer_id: The peer id to send the AVDTP command to. 70 71 Returns: 72 Dictionary, None if success, error if error. 73 """ 74 test_cmd = "avdtp_facade.AvdtpGetConfiguration" 75 test_args = {"identifier": peer_id} 76 77 return self.send_command(test_cmd, test_args) 78 79 def getCapabilities(self, peer_id): 80 """Sends the AVDTP command to input peer_id: get capabilities 81 82 Args: 83 peer_id: The peer id to send the AVDTP command to. 84 85 Returns: 86 Dictionary, None if success, error if error. 87 """ 88 test_cmd = "avdtp_facade.AvdtpGetCapabilities" 89 test_args = {"identifier": peer_id} 90 91 return self.send_command(test_cmd, test_args) 92 93 def getAllCapabilities(self, peer_id): 94 """Sends the AVDTP command to input peer_id: get all capabilities 95 96 Args: 97 peer_id: The peer id to send the AVDTP command to. 98 99 Returns: 100 Dictionary, None if success, error if error. 101 """ 102 test_cmd = "avdtp_facade.AvdtpGetAllCapabilities" 103 test_args = {"identifier": peer_id} 104 105 return self.send_command(test_cmd, test_args) 106 107 def reconfigureStream(self, peer_id): 108 """Sends the AVDTP command to input peer_id: reconfigure stream 109 110 Args: 111 peer_id: The peer id to send the AVDTP command to. 112 113 Returns: 114 Dictionary, None if success, error if error. 115 """ 116 test_cmd = "avdtp_facade.AvdtpReconfigureStream" 117 test_args = {"identifier": peer_id} 118 119 return self.send_command(test_cmd, test_args) 120 121 def suspendStream(self, peer_id): 122 """Sends the AVDTP command to input peer_id: suspend stream 123 Args: 124 peer_id: The peer id to send the AVDTP command to. 125 126 Returns: 127 Dictionary, None if success, error if error. 128 """ 129 test_cmd = "avdtp_facade.AvdtpSuspendStream" 130 test_args = {"identifier": peer_id} 131 132 return self.send_command(test_cmd, test_args) 133 134 def suspendAndReconfigure(self, peer_id): 135 """Sends the AVDTP command to input peer_id: suspend and reconfigure 136 137 Args: 138 peer_id: The peer id to send the AVDTP command to. 139 140 Returns: 141 Dictionary, None if success, error if error. 142 """ 143 test_cmd = "avdtp_facade.AvdtpSuspendAndReconfigure" 144 test_args = {"identifier": peer_id} 145 146 return self.send_command(test_cmd, test_args) 147 148 def releaseStream(self, peer_id): 149 """Sends the AVDTP command to input peer_id: release stream 150 151 Args: 152 peer_id: The peer id to send the AVDTP command to. 153 154 Returns: 155 Dictionary, None if success, error if error. 156 """ 157 test_cmd = "avdtp_facade.AvdtpReleaseStream" 158 test_args = {"identifier": peer_id} 159 160 return self.send_command(test_cmd, test_args) 161 162 def establishStream(self, peer_id): 163 """Sends the AVDTP command to input peer_id: establish stream 164 165 Args: 166 peer_id: The peer id to send the AVDTP command to. 167 168 Returns: 169 Dictionary, None if success, error if error. 170 """ 171 test_cmd = "avdtp_facade.AvdtpEstablishStream" 172 test_args = {"identifier": peer_id} 173 174 return self.send_command(test_cmd, test_args) 175 176 def startStream(self, peer_id): 177 """Sends the AVDTP command to input peer_id: start stream 178 179 Args: 180 peer_id: The peer id to send the AVDTP command to. 181 182 Returns: 183 Dictionary, None if success, error if error. 184 """ 185 test_cmd = "avdtp_facade.AvdtpStartStream" 186 test_args = {"identifier": peer_id} 187 188 return self.send_command(test_cmd, test_args) 189 190 def abortStream(self, peer_id): 191 """Sends the AVDTP command to input peer_id: abort stream 192 193 Args: 194 peer_id: The peer id to send the AVDTP command to. 195 196 Returns: 197 Dictionary, None if success, error if error. 198 """ 199 test_cmd = "avdtp_facade.AvdtpAbortStream" 200 test_args = {"identifier": peer_id} 201 test_id = self.build_id(self.test_counter) 202 self.test_counter += 1 203 204 return self.send_command(test_cmd, test_args) 205 206 def establishStream(self, peer_id): 207 """Sends the AVDTP command to input peer_id: establish stream 208 209 Args: 210 peer_id: The peer id to send the AVDTP command to. 211 212 Returns: 213 Dictionary, None if success, error if error. 214 """ 215 test_cmd = "avdtp_facade.AvdtpEstablishStream" 216 test_args = {"identifier": peer_id} 217 218 return self.send_command(test_cmd, test_args) 219 220 def removeService(self): 221 """Removes the AVDTP service from the Fuchsia device 222 223 Returns: 224 Dictionary, None if success, error if error. 225 """ 226 test_cmd = "avdtp_facade.AvdtpRemoveService" 227 test_args = {} 228 229 return self.send_command(test_cmd, test_args) 230