1#!/usr/bin/env python3
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from acts.controllers.fuchsia_lib.base_lib import BaseLib
18
19
20class FuchsiaGattcLib(BaseLib):
21
22    def __init__(self, addr: str) -> None:
23        super().__init__(addr, "gatt_client")
24
25    def bleStartBleScan(self, scan_filter):
26        """Starts a BLE scan
27
28        Args:
29            scan_time_ms: int, Amount of time to scan for.
30            scan_filter: dictionary, Device filter for a scan.
31            scan_count: int, Number of devices to scan for before termination.
32
33        Returns:
34            None if pass, err if fail.
35        """
36        test_cmd = "gatt_client_facade.BleStartScan"
37        test_args = {
38            "filter": scan_filter,
39        }
40
41        return self.send_command(test_cmd, test_args)
42
43    def bleStopBleScan(self):
44        """Stops a BLE scan
45
46        Returns:
47            Dictionary, List of devices discovered, error string if error.
48        """
49        test_cmd = "gatt_client_facade.BleStopScan"
50        test_args = {}
51
52        return self.send_command(test_cmd, test_args)
53
54    def listServices(self, id):
55        """Lists services of a peripheral specified by id.
56
57        Args:
58            id: string, Peripheral identifier to list services.
59
60        Returns:
61            Dictionary, List of Service Info if success, error string if error.
62        """
63        test_cmd = "gatt_client_facade.GattcListServices"
64        test_args = {"identifier": id}
65
66        return self.send_command(test_cmd, test_args)
67
68    def bleGetDiscoveredDevices(self):
69        """Stops a BLE scan
70
71        Returns:
72            Dictionary, List of devices discovered, error string if error.
73        """
74        test_cmd = "gatt_client_facade.BleGetDiscoveredDevices"
75        test_args = {}
76
77        return self.send_command(test_cmd, test_args)
78
79    def discoverCharacteristics(self):
80        """Discover the characteristics of a connected service.
81
82        Returns:
83            Dictionary, List of Characteristics and Descriptors if success,
84            error string if error.
85        """
86        test_cmd = "gatt_client_facade.GattcDiscoverCharacteristics"
87        test_args = {}
88
89        return self.send_command(test_cmd, test_args)
90
91    def writeCharById(self, id, offset, write_value):
92        """Write Characteristic by id..
93
94        Args:
95            id: string, Characteristic identifier.
96            offset: int, The offset of bytes to write to.
97            write_value: byte array, The bytes to write.
98
99        Returns:
100            None if success, error string if error.
101        """
102        test_cmd = "gatt_client_facade.GattcWriteCharacteristicById"
103        test_args = {
104            "identifier": id,
105            "offset": offset,
106            "write_value": write_value,
107        }
108
109        return self.send_command(test_cmd, test_args)
110
111    def writeLongCharById(self, id, offset, write_value, reliable_mode=False):
112        """Write Characteristic by id.
113
114        Args:
115            id: string, Characteristic identifier.
116            offset: int, The offset of bytes to write to.
117            write_value: byte array, The bytes to write.
118            reliable_mode: bool value representing reliable writes.
119
120        Returns:
121            None if success, error string if error.
122        """
123        test_cmd = "gatt_client_facade.GattcWriteLongCharacteristicById"
124        test_args = {
125            "identifier": id,
126            "offset": offset,
127            "write_value": write_value,
128            "reliable_mode": reliable_mode
129        }
130
131        return self.send_command(test_cmd, test_args)
132
133    def writeLongDescById(self, id, offset, write_value):
134        """Write Descriptor by id.
135
136        Args:
137            id: string, Characteristic identifier.
138            offset: int, The offset of bytes to write to.
139            write_value: byte array, The bytes to write.
140
141        Returns:
142            None if success, error string if error.
143        """
144        test_cmd = "gatt_client_facade.GattcWriteLongDescriptorById"
145        test_args = {
146            "identifier": id,
147            "offset": offset,
148            "write_value": write_value,
149        }
150
151        return self.send_command(test_cmd, test_args)
152
153    def writeCharByIdWithoutResponse(self, id, write_value):
154        """Write Characteristic by id without response.
155
156        Args:
157            id: string, Characteristic identifier.
158            write_value: byte array, The bytes to write.
159
160        Returns:
161            None if success, error string if error.
162        """
163        test_cmd = "gatt_client_facade.GattcWriteCharacteristicByIdWithoutResponse"
164        test_args = {
165            "identifier": id,
166            "write_value": write_value,
167        }
168
169        return self.send_command(test_cmd, test_args)
170
171    def enableNotifyCharacteristic(self, id):
172        """Enable notifications on a Characteristic.
173
174        Args:
175            id: string, Characteristic identifier.
176
177        Returns:
178            None if success, error string if error.
179        """
180        test_cmd = "gatt_client_facade.GattcEnableNotifyCharacteristic"
181        test_args = {
182            "identifier": id,
183        }
184
185        return self.send_command(test_cmd, test_args)
186
187    def disableNotifyCharacteristic(self, id):
188        """Disable notifications on a Characteristic.
189
190        Args:
191            id: string, Characteristic identifier.
192
193        Returns:
194            None if success, error string if error.
195        """
196        test_cmd = "gatt_client_facade.GattcDisableNotifyCharacteristic"
197        test_args = {
198            "identifier": id,
199            "value": False,
200        }
201
202        return self.send_command(test_cmd, test_args)
203
204    def readCharacteristicById(self, id):
205        """Read Characteristic value by id..
206
207        Args:
208            id: string, Characteristic identifier.
209
210        Returns:
211            Characteristic value if success, error string if error.
212        """
213        test_cmd = "gatt_client_facade.GattcReadCharacteristicById"
214        test_args = {
215            "identifier": id,
216        }
217
218        return self.send_command(test_cmd, test_args)
219
220    def readCharacteristicByType(self, uuid):
221        """Read Characteristic value by id..
222
223        Args:
224            uuid: string, Characteristic identifier.
225
226        Returns:
227            Characteristic value if success, error string if error.
228        """
229        test_cmd = "gatt_client_facade.GattcReadCharacteristicByType"
230        test_args = {
231            "uuid": uuid,
232        }
233
234        return self.send_command(test_cmd, test_args)
235
236    def readDescriptorById(self, id):
237        """Read Descriptor value by id..
238
239        Args:
240            id: string, Descriptor identifier.
241
242        Returns:
243            Descriptor value if success, error string if error.
244        """
245        test_cmd = "gatt_client_facade.GattcReadDescriptorById"
246        test_args = {
247            "identifier": id,
248        }
249
250        return self.send_command(test_cmd, test_args)
251
252    def readLongDescriptorById(self, id, offset, max_bytes):
253        """Reads Long Descriptor value by id.
254
255        Args:
256            id: string, Descriptor identifier.
257            offset: int, The offset to start reading from.
258            max_bytes: int, The max bytes to return.
259
260        Returns:
261            Descriptor value if success, error string if error.
262        """
263        test_cmd = "gatt_client_facade.GattcReadLongDescriptorById"
264        test_args = {
265            "identifier": id,
266            "offset": offset,
267            "max_bytes": max_bytes
268        }
269
270        return self.send_command(test_cmd, test_args)
271
272    def writeDescriptorById(self, id, offset, write_value):
273        """Write Descriptor by id.
274
275        Args:
276            id: string, Descriptor identifier.
277            write_value: byte array, The bytes to write.
278
279        Returns:
280            None if success, error string if error.
281        """
282        test_cmd = "gatt_client_facade.GattcWriteDescriptorById"
283        test_args = {
284            "identifier": id,
285            "write_value": write_value,
286        }
287
288        return self.send_command(test_cmd, test_args)
289
290    def readLongCharacteristicById(self, id, offset, max_bytes):
291        """Reads Long Characteristic value by id.
292
293        Args:
294            id: string, Characteristic identifier.
295            offset: int, The offset to start reading from.
296            max_bytes: int, The max bytes to return.
297
298        Returns:
299            Characteristic value if success, error string if error.
300        """
301        test_cmd = "gatt_client_facade.GattcReadLongCharacteristicById"
302        test_args = {
303            "identifier": id,
304            "offset": offset,
305            "max_bytes": max_bytes
306        }
307
308        return self.send_command(test_cmd, test_args)
309
310    def connectToService(self, id, service_id):
311        """ Connect to a specific Service specified by id.
312
313        Args:
314            id: string, Service id.
315
316        Returns:
317            None if success, error string if error.
318        """
319        test_cmd = "gatt_client_facade.GattcConnectToService"
320        test_args = {"identifier": id, "service_identifier": service_id}
321
322        return self.send_command(test_cmd, test_args)
323
324    def bleConnectToPeripheral(self, id):
325        """Connects to a peripheral specified by id.
326
327        Args:
328            id: string, Peripheral identifier to connect to.
329
330        Returns:
331            Dictionary, List of Service Info if success, error string if error.
332        """
333        test_cmd = "gatt_client_facade.BleConnectPeripheral"
334        test_args = {"identifier": id}
335
336        return self.send_command(test_cmd, test_args)
337
338    def bleDisconnectPeripheral(self, id):
339        """Disconnects from a peripheral specified by id.
340
341        Args:
342            id: string, Peripheral identifier to disconnect from.
343
344        Returns:
345            Dictionary, None if success, error string if error.
346        """
347        test_cmd = "gatt_client_facade.BleDisconnectPeripheral"
348        test_args = {"identifier": id}
349
350        return self.send_command(test_cmd, test_args)
351