1# 2# Copyright 2015 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16"""Script for sending data to a port. 17 18This script provides a simple shell interface for sending data at run-time to a 19port. 20 21Usage: 22 1. Choose a port to use. Use 'adb forward tcp:<port> 23 tcp:<port>' to forward the port to the device. 24 2. In a separate shell, build and push the test vendor library to the device 25 using the script mentioned in option A (i.e. without the --test-channel flag 26 set). 27 3. Once logcat has started, turn Bluetooth on from the device. 28 4. Run this program, in the shell from step 1, the port, also from step 1, 29 as arguments. 30 31 scapy is the tool we use to build packets in Python. 32 33 >>> d = HCI_Hdr(type=1) / HCI_Command_Hdr(opcode = 0x1004) / 34 Raw(load='\x01') 35 >>> print(d) 36 <HCI_Hdr type=Command |<HCI_Command_Hdr opcode=0x1004 |<Raw load='\x01' 37 |>>> 38 >>> raw(d) 39 '\x01\x04\x10\x01\x01' 40 >>> hexdump(d) 41 0000 0104100101 ..... 42 43 44 >>> pkt = HCI_Hdr('\x02\x02\x20\x0a\x00\x06\x00\x01\x00') / 45 L2CAP_CmdHdr(code=10, id=2, len=2) /L2CAP_InfoReq(type=2) 46 >>> pkt 47 <HCI_Hdr type=ACL Data |<HCI_ACL_Hdr handle=2 PB=0 BC=2 len=10 |<L2CAP_Hdr 48 len=6 cid=control |<L2CAP_CmdHdr code=info_req id=2 len=2 |<L2CAP_InfoReq 49 type=FEAT_MASK |>>>>> 50 >>> pkt = HCI_Hdr(type='ACL Data') / HCI_ACL_Hdr(handle=2, PB=0, BC=2, 51 len=10) / L2CAP_Hdr(len=6, cid='control') / L2CAP_CmdHdr(code='info_req', 52 id=2, len=2) / L2CAP_InfoReq(type='FEAT_MASK') 53 >>> raw(pkt) 54 '\x02\x02 \n\x00\x06\x00\x01\x00\n\x02\x02\x00\x02\x00' 55 >>> hexdump(pkt) 56 0000 0202200A00060001000A0202000200 .. ............ 57 58 59""" 60 61#!/usr/bin/env python3 62 63import binascii 64import cmd 65import queue 66import random 67import socket 68import string 69import struct 70import sys 71from scapy.all import * 72import time 73""" Add some more SCAPY stuff""" 74 75 76class HCI_Cmd_Create_Connection(Packet): 77 name = 'Create Connection' 78 fields_desc = [ 79 LEMACField('addr', None), 80 LEShortField('packet_type', 8), 81 ByteEnumField('page_scan_repetition_mode', 0, { 82 0: 'R0', 83 1: 'R1', 84 2: 'R2' 85 }), 86 ByteEnumField('rsvd', 0, {0: 'Reserved'}), 87 LEShortField('clock_offset', 0), 88 ByteEnumField('allow_role_switch', 1, { 89 0: 'false', 90 1: 'true' 91 }), 92 ] 93 94 95class HCI_Cmd_Inquiry(Packet): 96 name = 'Inquiry' 97 fields_desc = [ 98 X3BytesField('LAP', 0x9e8b0), 99 ByteField('length', 1), 100 ByteField('max_responses', 0), 101 ] 102 103 104bind_layers(HCI_Command_Hdr, HCI_Cmd_Inquiry, opcode=0x0401) 105bind_layers(HCI_Command_Hdr, HCI_Cmd_Create_Connection, opcode=0x0405) 106 107 108class HCI_Event_Inquiry_Result(Packet): 109 name = 'Inquiry Result' 110 fields_desc = [ 111 ByteField('num_responses', 0), 112 LEMACField('addr', None), 113 ByteEnumField('page_scan_repetition_mode', 0, { 114 0: 'R0', 115 1: 'R1', 116 2: 'R2' 117 }), 118 LEShortEnumField('rsvd', 0, {0: 'Reserved'}), 119 X3BytesField('class_of_device', 0), 120 LEShortField('clock_offset', 0), 121 ] 122 123 124class HCI_Event_Connection_Complete(Packet): 125 name = 'Connection Complete' 126 fields_desc = [ 127 ByteField('status', 0), 128 LEShortField('handle', 0xffff), 129 LEMACField('addr', None), 130 ByteField('link_type', 1), 131 ByteField('encryption_mode', 0), 132 ] 133 134 135class HCI_Event_Remote_Name_Request_Complete(Packet): 136 name = 'Remote Name Request Complete' 137 fields_desc = [ 138 ByteField('status', 0), 139 LEMACField('addr', None), 140 ] 141 142 143class HCI_Event_Read_Remote_Supported_Features_Complete(Packet): 144 name = 'Read Remote Supported Features Complete' 145 fields_desc = [ 146 ByteField('status', 0), 147 LEShortField('handle', 0xffff), 148 XLELongField('features', 0x0123456789abcdef), 149 ] 150 151 152class HCI_Event_Read_Remote_Version_Information_Complete(Packet): 153 name = 'Read Remote Version Information Complete' 154 fields_desc = [ 155 ByteField('status', 0), 156 LEShortField('handle', 0xffff), 157 ByteField('version', 0), 158 LEShortField('manufacturer_name', 0), 159 LEShortField('subversion', 0), 160 ] 161 162 163class HCI_Event_Read_Clock_Offset_Complete(Packet): 164 name = 'Read Clock Offset Complete' 165 fields_desc = [ 166 ByteField('status', 0), 167 LEShortField('handle', 0xffff), 168 LEShortField('offset', 0xffff), 169 ] 170 171 172class HCI_Event_Read_Remote_Extended_Features_Complete(Packet): 173 name = 'Read Remote Supported Features Complete' 174 fields_desc = [ 175 ByteField('status', 0), 176 LEShortField('handle', 0xffff), 177 ByteField('page_number', 0), 178 ByteField('max_page_number', 0), 179 XLELongField('features', 0x0123456789abcdef), 180 ] 181 182 183class HCI_Event_Extended_Inquiry_Result(Packet): 184 name = 'Extended Inquiry Result' 185 fields_desc = [ 186 ByteField('num_responses', 1), 187 LEMACField('addr', None), 188 ByteEnumField('page_scan_repetition_mode', 0, { 189 0: 'R0', 190 1: 'R1', 191 2: 'R2' 192 }), 193 ByteEnumField('rsvd', 0, {0: 'Reserved'}), 194 X3BytesField('class_of_device', 0), 195 LEShortField('clock_offset', 0), 196 SignedByteField('rssi', -20), 197 PacketListField('extended_inquiry_response', [], EIR_Hdr, 1), 198 ] 199 200 201bind_layers(HCI_Event_Hdr, HCI_Event_Inquiry_Result, code=0x02) 202bind_layers(HCI_Event_Hdr, HCI_Event_Connection_Complete, code=0x03) 203bind_layers(HCI_Event_Hdr, HCI_Event_Remote_Name_Request_Complete, code=0x07) 204bind_layers(HCI_Event_Hdr, HCI_Event_Read_Remote_Supported_Features_Complete, code=0x0b) 205bind_layers(HCI_Event_Hdr, HCI_Event_Read_Remote_Version_Information_Complete, code=0x0c) 206bind_layers(HCI_Event_Hdr, HCI_Event_Read_Clock_Offset_Complete, code=0x1c) 207bind_layers(HCI_Event_Hdr, HCI_Event_Read_Remote_Extended_Features_Complete, code=0x23) 208bind_layers(HCI_Event_Hdr, HCI_Event_Extended_Inquiry_Result, code=0x2f) 209""" END SCAPY stuff""" 210 211 212class HCISocket(SuperSocket): 213 """Simple wrapper class for a socket object. 214 215 Attributes: 216 socket: The underlying socket created for the specified address and port. 217 """ 218 219 def __init__(self, port): 220 self.done_ = False 221 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 222 s.connect(('localhost', port)) 223 self.ins = self.outs = s 224 self.packets_ = queue.Queue() 225 self.rx_thread_ = threading.Thread(target=self.rx_thread_body) 226 self.rx_thread_.start() 227 228 def rx_bytes(self, size): 229 while not self.done_: 230 raw_bytes = b'' 231 while len(raw_bytes) < size and not self.done_: 232 more_raw_bytes = self.ins.recv(min(size - len(raw_bytes), 2048)) 233 if more_raw_bytes: 234 raw_bytes += more_raw_bytes 235 return raw_bytes 236 237 def rx_thread_body(self): 238 while not self.done_: 239 payload_length = 0 240 # Read the type 241 type_byte = self.rx_bytes(1) 242 if not type_byte: 243 continue 244 # Read the Header 245 header = b'' 246 if type_byte == b'\x01': # Command 247 header = self.rx_bytes(3) 248 if not header: 249 continue 250 payload_length = header[2] 251 elif type_byte == b'\x02': # ACL 252 header = self.rx_bytes(4) 253 if not header: 254 continue 255 payload_length = header[3] << 8 256 payload_length |= header[2] 257 elif type_byte == b'\x03': # SCO 258 header = self.rx_bytes(3) 259 if not header: 260 continue 261 payload_length = header[2] 262 elif type_byte == b'\x04': # Event 263 header = self.rx_bytes(2) 264 if not header: 265 continue 266 payload_length = header[1] 267 else: 268 self.done_ = True 269 print('Rx: type_byte ' + hex(type_byte[0])) 270 # Read the Payload 271 payload = self.rx_bytes(payload_length) if payload_length != 0 else b'' 272 packet_bytes = type_byte + header + payload 273 packet = HCI_Hdr(packet_bytes) 274 print('Rx: ' + packet.__repr__()) 275 self.packets_.put(packet) 276 277 def get_packet(self): 278 if self.packets_.empty(): 279 return False 280 return self.packets_.get() 281 282 def tell_rx_thread_to_quit(self): 283 self.done_ = True 284 self.rx_thread_.join() 285 286 287class HCIShell(cmd.Cmd): 288 """Shell for sending binary data to a port. 289 290 """ 291 292 def __init__(self, hci): 293 cmd.Cmd.__init__(self) 294 self._hci = hci 295 296 def do_send(self, args): 297 """Arguments: dev_type_str Add a new device of type dev_type_str. 298 299 """ 300 self._hci.send_binary(args.split()) 301 302 def do_connect(self, args): 303 """Arguments: bluetooth_address xx:xx:xx:xx:xx:xx, timeout (seconds) 304 305 """ 306 split_args = args.split() 307 address = split_args[0] if len(split_args) > 0 else 'NULL' 308 timeout = int(split_args[1]) if len(split_args) > 1 else 2 309 num_responses = 0 310 connect = HCI_Hdr(type='Command') / HCI_Command_Hdr(opcode=0x0405) / HCI_Cmd_Create_Connection(addr=address) 311 self._hci.send(connect) 312 status = None 313 while status == None: 314 response = self._hci.get_packet() 315 if response == False: 316 continue 317 if response[HCI_Hdr].type == HCI_Hdr( 318 type='Event' 319 ).type and response[HCI_Event_Hdr].code == 0xf and response[HCI_Event_Command_Status].opcode == connect[HCI_Command_Hdr].opcode: 320 status = response[HCI_Event_Command_Status].status 321 if status != HCI_Event_Command_Status(status='pending').status: 322 print('Connection failed with status = ' + str(status)) 323 return 324 325 handle = None 326 while handle == None: 327 connection_complete = self._hci.get_packet() 328 if connection_complete == False: 329 continue 330 if (connection_complete[HCI_Hdr].type == HCI_Hdr(type='Event').type) and ( 331 connection_complete[HCI_Event_Hdr].code == 0x3): 332 status = connection_complete[HCI_Event_Connection_Complete].status 333 if status != 0: 334 print('Connection complete with failed status = ' + str(status)) 335 return 336 handle = connection_complete[HCI_Event_Connection_Complete].handle 337 print('Connection established with handle ' + str(handle)) 338 connection_complete.show() 339 hexdump(connection_complete) 340 341 l2cap_done = False 342 while l2cap_done == None: 343 l2cap_req = self._hci.get_packet() 344 if l2cap_req == False: 345 continue 346 if (l2cap_req[HCI_Hdr].type == HCI_Hdr(type='ACL Data').type) and (l2cap_req[L2CAP_Hdr].cid == L2CAP_Hdr( 347 cid='control').cid) and (l2cap_req[L2CAP_CmdHdr].code == L2CAP_CmdHdr(code='info_req').code) and ( 348 l2cap_req[L2CAP_InfoReq].type == L2CAP_InfoReq(type='FEAT_MASK').type): 349 print('Send Features packet' + 350 HCI_Hdr(type='ACL Data') / HCI_ACL_Hdr(handle=l2cap_req[HCI_ACL_Hdr].handle, PB=0, BC=2, len=16) / 351 L2CAP_Hdr(len=12, cid='control') / L2CAP_CmdHdr(code='info_resp', id=146, len=8) / L2CAP_InfoResp( 352 type=l2cap_req[L2CAP_InfoResp].type, result='success', data=b'\xb8\x00\x00\x00').__repr__()) 353 self._hci.send( 354 HCI_Hdr(type='ACL Data') / HCI_ACL_Hdr(handle=l2cap_req[HCI_ACL_Hdr].handle, PB=0, BC=2, len=16) / 355 L2CAP_Hdr(len=12, cid='control') / L2CAP_CmdHdr(code='info_resp', id=146, len=8) / L2CAP_InfoResp( 356 type=l2cap_req[L2CAP_InfoResp].type, result='success', data=b'\xb8\x00\x00\x00')) 357 358 def do_le_scan(self, args): 359 """Arguments: enable (0 or 1), filter duplicates (0 or 1) Print the scan responses from reachable devices 360 361 """ 362 split_args = args.split() 363 enable = int(split_args[0]) if len(split_args) > 0 else 1 364 filter_dups = int(split_args[1]) if len(split_args) > 1 else 1 365 set_scan_parameters = HCI_Hdr(type=1) / HCI_Command_Hdr(opcode=0x200b) / HCI_Cmd_LE_Set_Scan_Parameters(type=1) 366 print('Tx: ' + set_scan_parameters.__repr__()) 367 self._hci.send(set_scan_parameters) 368 set_scan_enable = HCI_Hdr(type=1) / HCI_Command_Hdr(opcode=0x200c) / HCI_Cmd_LE_Set_Scan_Enable( 369 enable=enable, filter_dups=filter_dups) 370 print('Tx: ' + set_scan_enable.__repr__()) 371 self._hci.send(set_scan_enable) 372 373 def do_scan(self, args): 374 """Arguments: timeout (seconds), max_results Print the scan responses from reachable devices 375 376 """ 377 split_args = args.split() 378 scan_time = int(split_args[0]) if len(split_args) > 0 else 0 379 max_responses = int(split_args[1]) if len(split_args) > 1 else 0 380 num_responses = 0 381 inquiry = HCI_Hdr(type='Command') / HCI_Command_Hdr(opcode=0x0401) / HCI_Cmd_Inquiry( 382 length=scan_time, max_responses=max_responses) 383 print('Tx: ' + inquiry.__repr__()) 384 self._hci.send(inquiry) 385 386 def do_wait(self, args): 387 """Arguments: time in seconds (float). 388 """ 389 sleep_time = float(args.split()[0]) 390 time.sleep(sleep_time) 391 392 def do_quit(self, args): 393 """Arguments: None. 394 395 Exits. 396 """ 397 self._hci.tell_rx_thread_to_quit() 398 self._hci.close() 399 print('Goodbye.') 400 return True 401 402 def do_help(self, args): 403 """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr. 404 405 """ 406 if (len(args) == 0): 407 cmd.Cmd.do_help(self, args) 408 409 410def main(argv): 411 if len(argv) != 2: 412 print('Usage: python hci_socket.py [port]') 413 return 414 try: 415 port = int(argv[1]) 416 except ValueError: 417 print('Error parsing port.') 418 else: 419 try: 420 hci = HCISocket(port) 421 except socket.error as e: 422 print('Error connecting to socket: %s' % e) 423 except: 424 print('Error creating (check arguments).') 425 else: 426 hci_shell = HCIShell(hci) 427 hci_shell.prompt = '$ ' 428 hci_shell.cmdloop('Welcome to the RootCanal HCI Console \n' + 'Type \'help\' for more information.') 429 430 431if __name__ == '__main__': 432 main(sys.argv) 433