1#!/usr/bin/env python3 2# 3# Copyright (C) 2016 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16 17import time 18 19from acts.keys import Config 20from acts.utils import rand_ascii_str 21from acts_contrib.test_utils.bt.bt_constants import gatt_cb_strings 22from acts_contrib.test_utils.bt.bt_constants import gatt_characteristic 23from acts_contrib.test_utils.bt.bt_constants import gatt_characteristic_value_format 24from acts_contrib.test_utils.bt.bt_constants import gatt_cb_err 25from acts_contrib.test_utils.bt.bt_constants import gatt_transport 26from acts_contrib.test_utils.bt.bt_constants import gatt_event 27from acts_contrib.test_utils.bt.bt_constants import gatt_server_responses 28from acts_contrib.test_utils.bt.bt_constants import gatt_service_types 29from acts_contrib.test_utils.bt.bt_constants import small_timeout 30from acts_contrib.test_utils.bt.gatt_test_database import STRING_512BYTES 31 32from acts.utils import exe_cmd 33from math import ceil 34 35 36class GattServerLib(): 37 38 characteristic_list = [] 39 default_timeout = 10 40 descriptor_list = [] 41 dut = None 42 gatt_server = None 43 gatt_server_callback = None 44 gatt_server_list = [] 45 log = None 46 service_list = [] 47 write_mapping = {} 48 49 def __init__(self, log, dut): 50 self.dut = dut 51 self.log = log 52 53 def list_all_uuids(self): 54 """From the GATT Client, discover services and list all services, 55 chars and descriptors. 56 """ 57 self.log.info("Service List:") 58 for service in self.dut.droid.gattGetServiceUuidList(self.gatt_server): 59 self.dut.log.info("GATT Server service uuid: {}".format(service)) 60 self.log.info("Characteristics List:") 61 for characteristic in self.characteristic_list: 62 instance_id = self.dut.droid.gattServerGetCharacteristicInstanceId( 63 characteristic) 64 uuid = self.dut.droid.gattServerGetCharacteristicUuid( 65 characteristic) 66 self.dut.log.info( 67 "GATT Server characteristic handle uuid: {} {}".format( 68 hex(instance_id), uuid)) 69 # TODO: add getting insance ids and uuids from each descriptor. 70 71 def open(self): 72 """Open an empty GATT Server instance""" 73 self.gatt_server_callback = self.dut.droid.gattServerCreateGattServerCallback( 74 ) 75 self.gatt_server = self.dut.droid.gattServerOpenGattServer( 76 self.gatt_server_callback) 77 self.gatt_server_list.append(self.gatt_server) 78 79 def clear_services(self): 80 """Clear BluetoothGattServices from BluetoothGattServer""" 81 self.dut.droid.gattServerClearServices(self.gatt_server) 82 83 def close_bluetooth_gatt_servers(self): 84 """Close Bluetooth Gatt Servers""" 85 try: 86 for btgs in self.gatt_server_list: 87 self.dut.droid.gattServerClose(btgs) 88 except Exception as err: 89 self.log.error( 90 "Failed to close Bluetooth GATT Servers: {}".format(err)) 91 self.characteristic_list = [] 92 self.descriptor_list = [] 93 self.gatt_server_list = [] 94 self.service_list = [] 95 96 def characteristic_set_value_by_instance_id(self, instance_id, value): 97 """Set Characteristic value by instance id""" 98 self.dut.droid.gattServerCharacteristicSetValueByInstanceId( 99 int(instance_id, 16), value) 100 101 def notify_characteristic_changed(self, instance_id, confirm): 102 """ Notify characteristic changed """ 103 self.dut.droid.gattServerNotifyCharacteristicChangedByInstanceId( 104 self.gatt_server, 0, int(instance_id, 16), confirm) 105 106 def send_response(self, user_input): 107 """Send a single response to the GATT Client""" 108 args = user_input.split() 109 mtu = 23 110 if len(args) == 2: 111 user_input = args[0] 112 mtu = int(args[1]) 113 desc_read = gatt_event['desc_read_req']['evt'].format( 114 self.gatt_server_callback) 115 desc_write = gatt_event['desc_write_req']['evt'].format( 116 self.gatt_server_callback) 117 char_read = gatt_event['char_read_req']['evt'].format( 118 self.gatt_server_callback) 119 char_write_req = gatt_event['char_write_req']['evt'].format( 120 self.gatt_server_callback) 121 char_write = gatt_event['char_write']['evt'].format( 122 self.gatt_server_callback) 123 execute_write = gatt_event['exec_write']['evt'].format( 124 self.gatt_server_callback) 125 regex = "({}|{}|{}|{}|{}|{})".format(desc_read, desc_write, char_read, 126 char_write, execute_write, 127 char_write_req) 128 events = self.dut.ed.pop_events(regex, 5, small_timeout) 129 status = 0 130 if user_input: 131 status = gatt_server_responses.get(user_input) 132 for event in events: 133 self.log.debug("Found event: {}.".format(event)) 134 request_id = event['data']['requestId'] 135 if event['name'] == execute_write: 136 if ('execute' in event['data'] 137 and event['data']['execute'] == True): 138 for key in self.write_mapping: 139 value = self.write_mapping[key] 140 self.log.info("Writing key, value: {}, {}".format( 141 key, value)) 142 self.dut.droid.gattServerSetByteArrayValueByInstanceId( 143 key, value) 144 else: 145 self.log.info("Execute result is false") 146 self.write_mapping = {} 147 self.dut.droid.gattServerSendResponse(self.gatt_server, 0, 148 request_id, status, 0, 149 []) 150 continue 151 offset = event['data']['offset'] 152 instance_id = event['data']['instanceId'] 153 if (event['name'] == desc_write or event['name'] == char_write 154 or event['name'] == char_write_req): 155 if ('preparedWrite' in event['data'] 156 and event['data']['preparedWrite'] == True): 157 value = event['data']['value'] 158 if instance_id in self.write_mapping.keys(): 159 self.write_mapping[instance_id] = self.write_mapping[ 160 instance_id] + value 161 self.log.info( 162 "New Prepared Write Value for {}: {}".format( 163 instance_id, self.write_mapping[instance_id])) 164 else: 165 self.log.info("write mapping key, value {}, {}".format( 166 instance_id, value)) 167 self.write_mapping[instance_id] = value 168 self.log.info("current value {}, {}".format( 169 instance_id, value)) 170 self.dut.droid.gattServerSendResponse( 171 self.gatt_server, 0, request_id, status, 0, value) 172 continue 173 else: 174 self.dut.droid.gattServerSetByteArrayValueByInstanceId( 175 event['data']['instanceId'], event['data']['value']) 176 177 try: 178 data = self.dut.droid.gattServerGetReadValueByInstanceId( 179 int(event['data']['instanceId'])) 180 except Exception as err: 181 self.log.error(err) 182 if not data: 183 data = [1] 184 self.log.info( 185 "GATT Server Send Response [request_id, status, offset, data]" \ 186 " [{}, {}, {}, {}]". 187 format(request_id, status, offset, data)) 188 data = data[offset:offset + mtu - 1] 189 self.dut.droid.gattServerSendResponse(self.gatt_server, 0, 190 request_id, status, offset, 191 data) 192 193 def _setup_service(self, serv): 194 service = self.dut.droid.gattServerCreateService( 195 serv['uuid'], serv['type']) 196 if 'handles' in serv: 197 self.dut.droid.gattServerServiceSetHandlesToReserve( 198 service, serv['handles']) 199 return service 200 201 def _setup_characteristic(self, char): 202 characteristic = \ 203 self.dut.droid.gattServerCreateBluetoothGattCharacteristic( 204 char['uuid'], char['properties'], char['permissions']) 205 if 'instance_id' in char: 206 self.dut.droid.gattServerCharacteristicSetInstanceId( 207 characteristic, char['instance_id']) 208 set_id = self.dut.droid.gattServerCharacteristicGetInstanceId( 209 characteristic) 210 if set_id != char['instance_id']: 211 self.log.error( 212 "Instance ID did not match up. Found {} Expected {}". 213 format(set_id, char['instance_id'])) 214 if 'value_type' in char: 215 value_type = char['value_type'] 216 value = char['value'] 217 if value_type == gatt_characteristic_value_format['string']: 218 self.log.info("Set String value result: {}".format( 219 self.dut.droid.gattServerCharacteristicSetStringValue( 220 characteristic, value))) 221 elif value_type == gatt_characteristic_value_format['byte']: 222 self.log.info("Set Byte Array value result: {}".format( 223 self.dut.droid.gattServerCharacteristicSetByteValue( 224 characteristic, value))) 225 else: 226 self.log.info("Set Int value result: {}".format( 227 self.dut.droid.gattServerCharacteristicSetIntValue( 228 characteristic, value, value_type, char['offset']))) 229 return characteristic 230 231 def _setup_descriptor(self, desc): 232 descriptor = self.dut.droid.gattServerCreateBluetoothGattDescriptor( 233 desc['uuid'], desc['permissions']) 234 if 'value' in desc: 235 self.dut.droid.gattServerDescriptorSetByteValue( 236 descriptor, desc['value']) 237 if 'instance_id' in desc: 238 self.dut.droid.gattServerDescriptorSetInstanceId( 239 descriptor, desc['instance_id']) 240 self.descriptor_list.append(descriptor) 241 return descriptor 242 243 def setup_gatts_db(self, database): 244 """Setup GATT Server database""" 245 self.gatt_server_callback = \ 246 self.dut.droid.gattServerCreateGattServerCallback() 247 self.gatt_server = self.dut.droid.gattServerOpenGattServer( 248 self.gatt_server_callback) 249 self.gatt_server_list.append(self.gatt_server) 250 for serv in database['services']: 251 service = self._setup_service(serv) 252 self.service_list.append(service) 253 if 'characteristics' in serv: 254 for char in serv['characteristics']: 255 characteristic = self._setup_characteristic(char) 256 if 'descriptors' in char: 257 for desc in char['descriptors']: 258 descriptor = self._setup_descriptor(desc) 259 self.dut.droid.gattServerCharacteristicAddDescriptor( 260 characteristic, descriptor) 261 self.characteristic_list.append(characteristic) 262 self.dut.droid.gattServerAddCharacteristicToService( 263 service, characteristic) 264 self.dut.droid.gattServerAddService(self.gatt_server, service) 265 expected_event = gatt_cb_strings['serv_added'].format( 266 self.gatt_server_callback) 267 self.dut.ed.pop_event(expected_event, 10) 268 return self.gatt_server, self.gatt_server_callback 269 270 def send_continuous_response(self, user_input): 271 """Send the same response""" 272 desc_read = gatt_event['desc_read_req']['evt'].format( 273 self.gatt_server_callback) 274 desc_write = gatt_event['desc_write_req']['evt'].format( 275 self.gatt_server_callback) 276 char_read = gatt_event['char_read_req']['evt'].format( 277 self.gatt_server_callback) 278 char_write = gatt_event['char_write']['evt'].format( 279 self.gatt_server_callback) 280 execute_write = gatt_event['exec_write']['evt'].format( 281 self.gatt_server_callback) 282 regex = "({}|{}|{}|{}|{})".format(desc_read, desc_write, char_read, 283 char_write, execute_write) 284 offset = 0 285 status = 0 286 mtu = 23 287 char_value = [] 288 for i in range(512): 289 char_value.append(i % 256) 290 len_min = 470 291 end_time = time.time() + 180 292 i = 0 293 num_packets = ceil((len(char_value) + 1) / (mtu - 1)) 294 while time.time() < end_time: 295 events = self.dut.ed.pop_events(regex, 10, small_timeout) 296 for event in events: 297 start_offset = i * (mtu - 1) 298 i += 1 299 self.log.debug("Found event: {}.".format(event)) 300 request_id = event['data']['requestId'] 301 data = char_value[start_offset:start_offset + mtu - 1] 302 if not data: 303 data = [1] 304 self.log.debug( 305 "GATT Server Send Response [request_id, status, offset, " \ 306 "data] [{}, {}, {}, {}]".format(request_id, status, offset, 307 data)) 308 self.dut.droid.gattServerSendResponse(self.gatt_server, 0, 309 request_id, status, 310 offset, data) 311 312 def send_continuous_response_data(self, user_input): 313 """Send the same response with data""" 314 desc_read = gatt_event['desc_read_req']['evt'].format( 315 self.gatt_server_callback) 316 desc_write = gatt_event['desc_write_req']['evt'].format( 317 self.gatt_server_callback) 318 char_read = gatt_event['char_read_req']['evt'].format( 319 self.gatt_server_callback) 320 char_write = gatt_event['char_write']['evt'].format( 321 self.gatt_server_callback) 322 execute_write = gatt_event['exec_write']['evt'].format( 323 self.gatt_server_callback) 324 regex = "({}|{}|{}|{}|{})".format(desc_read, desc_write, char_read, 325 char_write, execute_write) 326 offset = 0 327 status = 0 328 mtu = 11 329 char_value = [] 330 len_min = 470 331 end_time = time.time() + 180 332 i = 0 333 num_packets = ceil((len(char_value) + 1) / (mtu - 1)) 334 while time.time() < end_time: 335 events = self.dut.ed.pop_events(regex, 10, small_timeout) 336 for event in events: 337 self.log.info(event) 338 request_id = event['data']['requestId'] 339 if event['name'] == execute_write: 340 if ('execute' in event['data'] 341 and event['data']['execute'] == True): 342 for key in self.write_mapping: 343 value = self.write_mapping[key] 344 self.log.debug("Writing key, value: {}, {}".format( 345 key, value)) 346 self.dut.droid.gattServerSetByteArrayValueByInstanceId( 347 key, value) 348 self.write_mapping = {} 349 self.dut.droid.gattServerSendResponse( 350 self.gatt_server, 0, request_id, status, 0, [1]) 351 continue 352 offset = event['data']['offset'] 353 instance_id = event['data']['instanceId'] 354 if (event['name'] == desc_write 355 or event['name'] == char_write): 356 if ('preparedWrite' in event['data'] 357 and event['data']['preparedWrite'] == True): 358 value = event['data']['value'] 359 if instance_id in self.write_mapping: 360 self.write_mapping[ 361 instance_id] = self.write_mapping[ 362 instance_id] + value 363 else: 364 self.write_mapping[instance_id] = value 365 else: 366 self.dut.droid.gattServerSetByteArrayValueByInstanceId( 367 event['data']['instanceId'], 368 event['data']['value']) 369 try: 370 data = self.dut.droid.gattServerGetReadValueByInstanceId( 371 int(event['data']['instanceId'])) 372 except Exception as err: 373 self.log.error(err) 374 if not data: 375 self.dut.droid.gattServerSendResponse( 376 self.gatt_server, 0, request_id, status, offset, [1]) 377 else: 378 self.dut.droid.gattServerSendResponse( 379 self.gatt_server, 0, request_id, status, offset, 380 data[offset:offset + 17]) 381