1#!/usr/bin/env python3
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17import time
18
19from acts.keys import Config
20from acts.utils import rand_ascii_str
21from acts_contrib.test_utils.bt.bt_constants import gatt_cb_strings
22from acts_contrib.test_utils.bt.bt_constants import gatt_characteristic
23from acts_contrib.test_utils.bt.bt_constants import gatt_characteristic_value_format
24from acts_contrib.test_utils.bt.bt_constants import gatt_cb_err
25from acts_contrib.test_utils.bt.bt_constants import gatt_transport
26from acts_contrib.test_utils.bt.bt_constants import gatt_event
27from acts_contrib.test_utils.bt.bt_constants import gatt_server_responses
28from acts_contrib.test_utils.bt.bt_constants import gatt_service_types
29from acts_contrib.test_utils.bt.bt_constants import small_timeout
30from acts_contrib.test_utils.bt.gatt_test_database import STRING_512BYTES
31
32from acts.utils import exe_cmd
33from math import ceil
34
35
36class GattServerLib():
37
38    characteristic_list = []
39    default_timeout = 10
40    descriptor_list = []
41    dut = None
42    gatt_server = None
43    gatt_server_callback = None
44    gatt_server_list = []
45    log = None
46    service_list = []
47    write_mapping = {}
48
49    def __init__(self, log, dut):
50        self.dut = dut
51        self.log = log
52
53    def list_all_uuids(self):
54        """From the GATT Client, discover services and list all services,
55        chars and descriptors.
56        """
57        self.log.info("Service List:")
58        for service in self.dut.droid.gattGetServiceUuidList(self.gatt_server):
59            self.dut.log.info("GATT Server service uuid: {}".format(service))
60        self.log.info("Characteristics List:")
61        for characteristic in self.characteristic_list:
62            instance_id = self.dut.droid.gattServerGetCharacteristicInstanceId(
63                characteristic)
64            uuid = self.dut.droid.gattServerGetCharacteristicUuid(
65                characteristic)
66            self.dut.log.info(
67                "GATT Server characteristic handle uuid: {} {}".format(
68                    hex(instance_id), uuid))
69        # TODO: add getting insance ids and uuids from each descriptor.
70
71    def open(self):
72        """Open an empty GATT Server instance"""
73        self.gatt_server_callback = self.dut.droid.gattServerCreateGattServerCallback(
74        )
75        self.gatt_server = self.dut.droid.gattServerOpenGattServer(
76            self.gatt_server_callback)
77        self.gatt_server_list.append(self.gatt_server)
78
79    def clear_services(self):
80        """Clear BluetoothGattServices from BluetoothGattServer"""
81        self.dut.droid.gattServerClearServices(self.gatt_server)
82
83    def close_bluetooth_gatt_servers(self):
84        """Close Bluetooth Gatt Servers"""
85        try:
86            for btgs in self.gatt_server_list:
87                self.dut.droid.gattServerClose(btgs)
88        except Exception as err:
89            self.log.error(
90                "Failed to close Bluetooth GATT Servers: {}".format(err))
91        self.characteristic_list = []
92        self.descriptor_list = []
93        self.gatt_server_list = []
94        self.service_list = []
95
96    def characteristic_set_value_by_instance_id(self, instance_id, value):
97        """Set Characteristic value by instance id"""
98        self.dut.droid.gattServerCharacteristicSetValueByInstanceId(
99            int(instance_id, 16), value)
100
101    def notify_characteristic_changed(self, instance_id, confirm):
102        """ Notify characteristic changed """
103        self.dut.droid.gattServerNotifyCharacteristicChangedByInstanceId(
104            self.gatt_server, 0, int(instance_id, 16), confirm)
105
106    def send_response(self, user_input):
107        """Send a single response to the GATT Client"""
108        args = user_input.split()
109        mtu = 23
110        if len(args) == 2:
111            user_input = args[0]
112            mtu = int(args[1])
113        desc_read = gatt_event['desc_read_req']['evt'].format(
114            self.gatt_server_callback)
115        desc_write = gatt_event['desc_write_req']['evt'].format(
116            self.gatt_server_callback)
117        char_read = gatt_event['char_read_req']['evt'].format(
118            self.gatt_server_callback)
119        char_write_req = gatt_event['char_write_req']['evt'].format(
120            self.gatt_server_callback)
121        char_write = gatt_event['char_write']['evt'].format(
122            self.gatt_server_callback)
123        execute_write = gatt_event['exec_write']['evt'].format(
124            self.gatt_server_callback)
125        regex = "({}|{}|{}|{}|{}|{})".format(desc_read, desc_write, char_read,
126                                             char_write, execute_write,
127                                             char_write_req)
128        events = self.dut.ed.pop_events(regex, 5, small_timeout)
129        status = 0
130        if user_input:
131            status = gatt_server_responses.get(user_input)
132        for event in events:
133            self.log.debug("Found event: {}.".format(event))
134            request_id = event['data']['requestId']
135            if event['name'] == execute_write:
136                if ('execute' in event['data']
137                        and event['data']['execute'] == True):
138                    for key in self.write_mapping:
139                        value = self.write_mapping[key]
140                        self.log.info("Writing key, value: {}, {}".format(
141                            key, value))
142                        self.dut.droid.gattServerSetByteArrayValueByInstanceId(
143                            key, value)
144                else:
145                    self.log.info("Execute result is false")
146                self.write_mapping = {}
147                self.dut.droid.gattServerSendResponse(self.gatt_server, 0,
148                                                      request_id, status, 0,
149                                                      [])
150                continue
151            offset = event['data']['offset']
152            instance_id = event['data']['instanceId']
153            if (event['name'] == desc_write or event['name'] == char_write
154                    or event['name'] == char_write_req):
155                if ('preparedWrite' in event['data']
156                        and event['data']['preparedWrite'] == True):
157                    value = event['data']['value']
158                    if instance_id in self.write_mapping.keys():
159                        self.write_mapping[instance_id] = self.write_mapping[
160                            instance_id] + value
161                        self.log.info(
162                            "New Prepared Write Value for {}: {}".format(
163                                instance_id, self.write_mapping[instance_id]))
164                    else:
165                        self.log.info("write mapping key, value {}, {}".format(
166                            instance_id, value))
167                        self.write_mapping[instance_id] = value
168                        self.log.info("current value {}, {}".format(
169                            instance_id, value))
170                    self.dut.droid.gattServerSendResponse(
171                        self.gatt_server, 0, request_id, status, 0, value)
172                    continue
173                else:
174                    self.dut.droid.gattServerSetByteArrayValueByInstanceId(
175                        event['data']['instanceId'], event['data']['value'])
176
177            try:
178                data = self.dut.droid.gattServerGetReadValueByInstanceId(
179                    int(event['data']['instanceId']))
180            except Exception as err:
181                self.log.error(err)
182            if not data:
183                data = [1]
184            self.log.info(
185                "GATT Server Send Response [request_id, status, offset, data]" \
186                " [{}, {}, {}, {}]".
187                format(request_id, status, offset, data))
188            data = data[offset:offset + mtu - 1]
189            self.dut.droid.gattServerSendResponse(self.gatt_server, 0,
190                                                  request_id, status, offset,
191                                                  data)
192
193    def _setup_service(self, serv):
194        service = self.dut.droid.gattServerCreateService(
195            serv['uuid'], serv['type'])
196        if 'handles' in serv:
197            self.dut.droid.gattServerServiceSetHandlesToReserve(
198                service, serv['handles'])
199        return service
200
201    def _setup_characteristic(self, char):
202        characteristic = \
203            self.dut.droid.gattServerCreateBluetoothGattCharacteristic(
204                char['uuid'], char['properties'], char['permissions'])
205        if 'instance_id' in char:
206            self.dut.droid.gattServerCharacteristicSetInstanceId(
207                characteristic, char['instance_id'])
208            set_id = self.dut.droid.gattServerCharacteristicGetInstanceId(
209                characteristic)
210            if set_id != char['instance_id']:
211                self.log.error(
212                    "Instance ID did not match up. Found {} Expected {}".
213                    format(set_id, char['instance_id']))
214        if 'value_type' in char:
215            value_type = char['value_type']
216            value = char['value']
217            if value_type == gatt_characteristic_value_format['string']:
218                self.log.info("Set String value result: {}".format(
219                    self.dut.droid.gattServerCharacteristicSetStringValue(
220                        characteristic, value)))
221            elif value_type == gatt_characteristic_value_format['byte']:
222                self.log.info("Set Byte Array value result: {}".format(
223                    self.dut.droid.gattServerCharacteristicSetByteValue(
224                        characteristic, value)))
225            else:
226                self.log.info("Set Int value result: {}".format(
227                    self.dut.droid.gattServerCharacteristicSetIntValue(
228                        characteristic, value, value_type, char['offset'])))
229        return characteristic
230
231    def _setup_descriptor(self, desc):
232        descriptor = self.dut.droid.gattServerCreateBluetoothGattDescriptor(
233            desc['uuid'], desc['permissions'])
234        if 'value' in desc:
235            self.dut.droid.gattServerDescriptorSetByteValue(
236                descriptor, desc['value'])
237        if 'instance_id' in desc:
238            self.dut.droid.gattServerDescriptorSetInstanceId(
239                descriptor, desc['instance_id'])
240        self.descriptor_list.append(descriptor)
241        return descriptor
242
243    def setup_gatts_db(self, database):
244        """Setup GATT Server database"""
245        self.gatt_server_callback = \
246            self.dut.droid.gattServerCreateGattServerCallback()
247        self.gatt_server = self.dut.droid.gattServerOpenGattServer(
248            self.gatt_server_callback)
249        self.gatt_server_list.append(self.gatt_server)
250        for serv in database['services']:
251            service = self._setup_service(serv)
252            self.service_list.append(service)
253            if 'characteristics' in serv:
254                for char in serv['characteristics']:
255                    characteristic = self._setup_characteristic(char)
256                    if 'descriptors' in char:
257                        for desc in char['descriptors']:
258                            descriptor = self._setup_descriptor(desc)
259                            self.dut.droid.gattServerCharacteristicAddDescriptor(
260                                characteristic, descriptor)
261                    self.characteristic_list.append(characteristic)
262                    self.dut.droid.gattServerAddCharacteristicToService(
263                        service, characteristic)
264            self.dut.droid.gattServerAddService(self.gatt_server, service)
265            expected_event = gatt_cb_strings['serv_added'].format(
266                self.gatt_server_callback)
267            self.dut.ed.pop_event(expected_event, 10)
268        return self.gatt_server, self.gatt_server_callback
269
270    def send_continuous_response(self, user_input):
271        """Send the same response"""
272        desc_read = gatt_event['desc_read_req']['evt'].format(
273            self.gatt_server_callback)
274        desc_write = gatt_event['desc_write_req']['evt'].format(
275            self.gatt_server_callback)
276        char_read = gatt_event['char_read_req']['evt'].format(
277            self.gatt_server_callback)
278        char_write = gatt_event['char_write']['evt'].format(
279            self.gatt_server_callback)
280        execute_write = gatt_event['exec_write']['evt'].format(
281            self.gatt_server_callback)
282        regex = "({}|{}|{}|{}|{})".format(desc_read, desc_write, char_read,
283                                          char_write, execute_write)
284        offset = 0
285        status = 0
286        mtu = 23
287        char_value = []
288        for i in range(512):
289            char_value.append(i % 256)
290        len_min = 470
291        end_time = time.time() + 180
292        i = 0
293        num_packets = ceil((len(char_value) + 1) / (mtu - 1))
294        while time.time() < end_time:
295            events = self.dut.ed.pop_events(regex, 10, small_timeout)
296            for event in events:
297                start_offset = i * (mtu - 1)
298                i += 1
299                self.log.debug("Found event: {}.".format(event))
300                request_id = event['data']['requestId']
301                data = char_value[start_offset:start_offset + mtu - 1]
302                if not data:
303                    data = [1]
304                self.log.debug(
305                    "GATT Server Send Response [request_id, status, offset, " \
306                    "data] [{}, {}, {}, {}]".format(request_id, status, offset,
307                        data))
308                self.dut.droid.gattServerSendResponse(self.gatt_server, 0,
309                                                      request_id, status,
310                                                      offset, data)
311
312    def send_continuous_response_data(self, user_input):
313        """Send the same response with data"""
314        desc_read = gatt_event['desc_read_req']['evt'].format(
315            self.gatt_server_callback)
316        desc_write = gatt_event['desc_write_req']['evt'].format(
317            self.gatt_server_callback)
318        char_read = gatt_event['char_read_req']['evt'].format(
319            self.gatt_server_callback)
320        char_write = gatt_event['char_write']['evt'].format(
321            self.gatt_server_callback)
322        execute_write = gatt_event['exec_write']['evt'].format(
323            self.gatt_server_callback)
324        regex = "({}|{}|{}|{}|{})".format(desc_read, desc_write, char_read,
325                                          char_write, execute_write)
326        offset = 0
327        status = 0
328        mtu = 11
329        char_value = []
330        len_min = 470
331        end_time = time.time() + 180
332        i = 0
333        num_packets = ceil((len(char_value) + 1) / (mtu - 1))
334        while time.time() < end_time:
335            events = self.dut.ed.pop_events(regex, 10, small_timeout)
336            for event in events:
337                self.log.info(event)
338                request_id = event['data']['requestId']
339                if event['name'] == execute_write:
340                    if ('execute' in event['data']
341                            and event['data']['execute'] == True):
342                        for key in self.write_mapping:
343                            value = self.write_mapping[key]
344                            self.log.debug("Writing key, value: {}, {}".format(
345                                key, value))
346                            self.dut.droid.gattServerSetByteArrayValueByInstanceId(
347                                key, value)
348                        self.write_mapping = {}
349                    self.dut.droid.gattServerSendResponse(
350                        self.gatt_server, 0, request_id, status, 0, [1])
351                    continue
352                offset = event['data']['offset']
353                instance_id = event['data']['instanceId']
354                if (event['name'] == desc_write
355                        or event['name'] == char_write):
356                    if ('preparedWrite' in event['data']
357                            and event['data']['preparedWrite'] == True):
358                        value = event['data']['value']
359                        if instance_id in self.write_mapping:
360                            self.write_mapping[
361                                instance_id] = self.write_mapping[
362                                    instance_id] + value
363                        else:
364                            self.write_mapping[instance_id] = value
365                    else:
366                        self.dut.droid.gattServerSetByteArrayValueByInstanceId(
367                            event['data']['instanceId'],
368                            event['data']['value'])
369                try:
370                    data = self.dut.droid.gattServerGetReadValueByInstanceId(
371                        int(event['data']['instanceId']))
372                except Exception as err:
373                    self.log.error(err)
374                if not data:
375                    self.dut.droid.gattServerSendResponse(
376                        self.gatt_server, 0, request_id, status, offset, [1])
377                else:
378                    self.dut.droid.gattServerSendResponse(
379                        self.gatt_server, 0, request_id, status, offset,
380                        data[offset:offset + 17])
381