1#
2# Copyright 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16"""Script for sending data to a port.
17
18This script provides a simple shell interface for sending data at run-time to a
19port.
20
21Usage:
22    1. Choose a port to use. Use 'adb forward tcp:<port>
23    tcp:<port>' to forward the port to the device.
24    2. In a separate shell, build and push the test vendor library to the device
25    using the script mentioned in option A (i.e. without the --test-channel flag
26    set).
27    3. Once logcat has started, turn Bluetooth on from the device.
28    4. Run this program, in the shell from step 1,  the port, also from step 1,
29    as arguments.
30
31    scapy is the tool we use to build packets in Python.
32
33    >>> d = HCI_Hdr(type=1) / HCI_Command_Hdr(opcode = 0x1004) /
34    Raw(load='\x01')
35    >>> print(d)
36    <HCI_Hdr  type=Command |<HCI_Command_Hdr  opcode=0x1004 |<Raw  load='\x01'
37    |>>>
38    >>> raw(d)
39    '\x01\x04\x10\x01\x01'
40    >>> hexdump(d)
41    0000  0104100101                       .....
42
43
44    >>> pkt = HCI_Hdr('\x02\x02\x20\x0a\x00\x06\x00\x01\x00') /
45    L2CAP_CmdHdr(code=10, id=2, len=2) /L2CAP_InfoReq(type=2)
46    >>> pkt
47    <HCI_Hdr  type=ACL Data |<HCI_ACL_Hdr  handle=2 PB=0 BC=2 len=10 |<L2CAP_Hdr
48    len=6 cid=control |<L2CAP_CmdHdr  code=info_req id=2 len=2 |<L2CAP_InfoReq
49    type=FEAT_MASK |>>>>>
50    >>> pkt = HCI_Hdr(type='ACL Data') / HCI_ACL_Hdr(handle=2, PB=0, BC=2,
51    len=10) / L2CAP_Hdr(len=6, cid='control') / L2CAP_CmdHdr(code='info_req',
52    id=2, len=2) / L2CAP_InfoReq(type='FEAT_MASK')
53    >>> raw(pkt)
54    '\x02\x02 \n\x00\x06\x00\x01\x00\n\x02\x02\x00\x02\x00'
55    >>> hexdump(pkt)
56    0000  0202200A00060001000A0202000200   .. ............
57
58
59"""
60
61#!/usr/bin/env python3
62
63import binascii
64import cmd
65import queue
66import random
67import socket
68import string
69import struct
70import sys
71from scapy.all import *
72import time
73""" Add some more SCAPY stuff"""
74
75
76class HCI_Cmd_Create_Connection(Packet):
77    name = 'Create Connection'
78    fields_desc = [
79        LEMACField('addr', None),
80        LEShortField('packet_type', 8),
81        ByteEnumField('page_scan_repetition_mode', 0, {
82            0: 'R0',
83            1: 'R1',
84            2: 'R2'
85        }),
86        ByteEnumField('rsvd', 0, {0: 'Reserved'}),
87        LEShortField('clock_offset', 0),
88        ByteEnumField('allow_role_switch', 1, {
89            0: 'false',
90            1: 'true'
91        }),
92    ]
93
94
95class HCI_Cmd_Inquiry(Packet):
96    name = 'Inquiry'
97    fields_desc = [
98        X3BytesField('LAP', 0x9e8b0),
99        ByteField('length', 1),
100        ByteField('max_responses', 0),
101    ]
102
103
104bind_layers(HCI_Command_Hdr, HCI_Cmd_Inquiry, opcode=0x0401)
105bind_layers(HCI_Command_Hdr, HCI_Cmd_Create_Connection, opcode=0x0405)
106
107
108class HCI_Event_Inquiry_Result(Packet):
109    name = 'Inquiry Result'
110    fields_desc = [
111        ByteField('num_responses', 0),
112        LEMACField('addr', None),
113        ByteEnumField('page_scan_repetition_mode', 0, {
114            0: 'R0',
115            1: 'R1',
116            2: 'R2'
117        }),
118        LEShortEnumField('rsvd', 0, {0: 'Reserved'}),
119        X3BytesField('class_of_device', 0),
120        LEShortField('clock_offset', 0),
121    ]
122
123
124class HCI_Event_Connection_Complete(Packet):
125    name = 'Connection Complete'
126    fields_desc = [
127        ByteField('status', 0),
128        LEShortField('handle', 0xffff),
129        LEMACField('addr', None),
130        ByteField('link_type', 1),
131        ByteField('encryption_mode', 0),
132    ]
133
134
135class HCI_Event_Remote_Name_Request_Complete(Packet):
136    name = 'Remote Name Request Complete'
137    fields_desc = [
138        ByteField('status', 0),
139        LEMACField('addr', None),
140    ]
141
142
143class HCI_Event_Read_Remote_Supported_Features_Complete(Packet):
144    name = 'Read Remote Supported Features Complete'
145    fields_desc = [
146        ByteField('status', 0),
147        LEShortField('handle', 0xffff),
148        XLELongField('features', 0x0123456789abcdef),
149    ]
150
151
152class HCI_Event_Read_Remote_Version_Information_Complete(Packet):
153    name = 'Read Remote Version Information Complete'
154    fields_desc = [
155        ByteField('status', 0),
156        LEShortField('handle', 0xffff),
157        ByteField('version', 0),
158        LEShortField('manufacturer_name', 0),
159        LEShortField('subversion', 0),
160    ]
161
162
163class HCI_Event_Read_Clock_Offset_Complete(Packet):
164    name = 'Read Clock Offset Complete'
165    fields_desc = [
166        ByteField('status', 0),
167        LEShortField('handle', 0xffff),
168        LEShortField('offset', 0xffff),
169    ]
170
171
172class HCI_Event_Read_Remote_Extended_Features_Complete(Packet):
173    name = 'Read Remote Supported Features Complete'
174    fields_desc = [
175        ByteField('status', 0),
176        LEShortField('handle', 0xffff),
177        ByteField('page_number', 0),
178        ByteField('max_page_number', 0),
179        XLELongField('features', 0x0123456789abcdef),
180    ]
181
182
183class HCI_Event_Extended_Inquiry_Result(Packet):
184    name = 'Extended Inquiry Result'
185    fields_desc = [
186        ByteField('num_responses', 1),
187        LEMACField('addr', None),
188        ByteEnumField('page_scan_repetition_mode', 0, {
189            0: 'R0',
190            1: 'R1',
191            2: 'R2'
192        }),
193        ByteEnumField('rsvd', 0, {0: 'Reserved'}),
194        X3BytesField('class_of_device', 0),
195        LEShortField('clock_offset', 0),
196        SignedByteField('rssi', -20),
197        PacketListField('extended_inquiry_response', [], EIR_Hdr, 1),
198    ]
199
200
201bind_layers(HCI_Event_Hdr, HCI_Event_Inquiry_Result, code=0x02)
202bind_layers(HCI_Event_Hdr, HCI_Event_Connection_Complete, code=0x03)
203bind_layers(HCI_Event_Hdr, HCI_Event_Remote_Name_Request_Complete, code=0x07)
204bind_layers(HCI_Event_Hdr, HCI_Event_Read_Remote_Supported_Features_Complete, code=0x0b)
205bind_layers(HCI_Event_Hdr, HCI_Event_Read_Remote_Version_Information_Complete, code=0x0c)
206bind_layers(HCI_Event_Hdr, HCI_Event_Read_Clock_Offset_Complete, code=0x1c)
207bind_layers(HCI_Event_Hdr, HCI_Event_Read_Remote_Extended_Features_Complete, code=0x23)
208bind_layers(HCI_Event_Hdr, HCI_Event_Extended_Inquiry_Result, code=0x2f)
209""" END SCAPY stuff"""
210
211
212class HCISocket(SuperSocket):
213    """Simple wrapper class for a socket object.
214
215  Attributes:
216    socket: The underlying socket created for the specified address and port.
217  """
218
219    def __init__(self, port):
220        self.done_ = False
221        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
222        s.connect(('localhost', port))
223        self.ins = self.outs = s
224        self.packets_ = queue.Queue()
225        self.rx_thread_ = threading.Thread(target=self.rx_thread_body)
226        self.rx_thread_.start()
227
228    def rx_bytes(self, size):
229        while not self.done_:
230            raw_bytes = b''
231            while len(raw_bytes) < size and not self.done_:
232                more_raw_bytes = self.ins.recv(min(size - len(raw_bytes), 2048))
233                if more_raw_bytes:
234                    raw_bytes += more_raw_bytes
235            return raw_bytes
236
237    def rx_thread_body(self):
238        while not self.done_:
239            payload_length = 0
240            # Read the type
241            type_byte = self.rx_bytes(1)
242            if not type_byte:
243                continue
244            # Read the Header
245            header = b''
246            if type_byte == b'\x01':  # Command
247                header = self.rx_bytes(3)
248                if not header:
249                    continue
250                payload_length = header[2]
251            elif type_byte == b'\x02':  # ACL
252                header = self.rx_bytes(4)
253                if not header:
254                    continue
255                payload_length = header[3] << 8
256                payload_length |= header[2]
257            elif type_byte == b'\x03':  # SCO
258                header = self.rx_bytes(3)
259                if not header:
260                    continue
261                payload_length = header[2]
262            elif type_byte == b'\x04':  # Event
263                header = self.rx_bytes(2)
264                if not header:
265                    continue
266                payload_length = header[1]
267            else:
268                self.done_ = True
269                print('Rx: type_byte ' + hex(type_byte[0]))
270            # Read the Payload
271            payload = self.rx_bytes(payload_length) if payload_length != 0 else b''
272            packet_bytes = type_byte + header + payload
273            packet = HCI_Hdr(packet_bytes)
274            print('Rx: ' + packet.__repr__())
275            self.packets_.put(packet)
276
277    def get_packet(self):
278        if self.packets_.empty():
279            return False
280        return self.packets_.get()
281
282    def tell_rx_thread_to_quit(self):
283        self.done_ = True
284        self.rx_thread_.join()
285
286
287class HCIShell(cmd.Cmd):
288    """Shell for sending binary data to a port.
289
290  """
291
292    def __init__(self, hci):
293        cmd.Cmd.__init__(self)
294        self._hci = hci
295
296    def do_send(self, args):
297        """Arguments: dev_type_str Add a new device of type dev_type_str.
298
299    """
300        self._hci.send_binary(args.split())
301
302    def do_connect(self, args):
303        """Arguments: bluetooth_address xx:xx:xx:xx:xx:xx, timeout (seconds)
304
305    """
306        split_args = args.split()
307        address = split_args[0] if len(split_args) > 0 else 'NULL'
308        timeout = int(split_args[1]) if len(split_args) > 1 else 2
309        num_responses = 0
310        connect = HCI_Hdr(type='Command') / HCI_Command_Hdr(opcode=0x0405) / HCI_Cmd_Create_Connection(addr=address)
311        self._hci.send(connect)
312        status = None
313        while status == None:
314            response = self._hci.get_packet()
315            if response == False:
316                continue
317            if response[HCI_Hdr].type == HCI_Hdr(
318                    type='Event'
319            ).type and response[HCI_Event_Hdr].code == 0xf and response[HCI_Event_Command_Status].opcode == connect[HCI_Command_Hdr].opcode:
320                status = response[HCI_Event_Command_Status].status
321        if status != HCI_Event_Command_Status(status='pending').status:
322            print('Connection failed with status = ' + str(status))
323            return
324
325        handle = None
326        while handle == None:
327            connection_complete = self._hci.get_packet()
328            if connection_complete == False:
329                continue
330            if (connection_complete[HCI_Hdr].type == HCI_Hdr(type='Event').type) and (
331                    connection_complete[HCI_Event_Hdr].code == 0x3):
332                status = connection_complete[HCI_Event_Connection_Complete].status
333                if status != 0:
334                    print('Connection complete with failed status = ' + str(status))
335                    return
336                handle = connection_complete[HCI_Event_Connection_Complete].handle
337                print('Connection established with handle ' + str(handle))
338                connection_complete.show()
339                hexdump(connection_complete)
340
341        l2cap_done = False
342        while l2cap_done == None:
343            l2cap_req = self._hci.get_packet()
344            if l2cap_req == False:
345                continue
346            if (l2cap_req[HCI_Hdr].type == HCI_Hdr(type='ACL Data').type) and (l2cap_req[L2CAP_Hdr].cid == L2CAP_Hdr(
347                    cid='control').cid) and (l2cap_req[L2CAP_CmdHdr].code == L2CAP_CmdHdr(code='info_req').code) and (
348                        l2cap_req[L2CAP_InfoReq].type == L2CAP_InfoReq(type='FEAT_MASK').type):
349                print('Send Features packet' +
350                      HCI_Hdr(type='ACL Data') / HCI_ACL_Hdr(handle=l2cap_req[HCI_ACL_Hdr].handle, PB=0, BC=2, len=16) /
351                      L2CAP_Hdr(len=12, cid='control') / L2CAP_CmdHdr(code='info_resp', id=146, len=8) / L2CAP_InfoResp(
352                          type=l2cap_req[L2CAP_InfoResp].type, result='success', data=b'\xb8\x00\x00\x00').__repr__())
353                self._hci.send(
354                    HCI_Hdr(type='ACL Data') / HCI_ACL_Hdr(handle=l2cap_req[HCI_ACL_Hdr].handle, PB=0, BC=2, len=16) /
355                    L2CAP_Hdr(len=12, cid='control') / L2CAP_CmdHdr(code='info_resp', id=146, len=8) / L2CAP_InfoResp(
356                        type=l2cap_req[L2CAP_InfoResp].type, result='success', data=b'\xb8\x00\x00\x00'))
357
358    def do_le_scan(self, args):
359        """Arguments: enable (0 or 1), filter duplicates (0 or 1) Print the scan responses from reachable devices
360
361    """
362        split_args = args.split()
363        enable = int(split_args[0]) if len(split_args) > 0 else 1
364        filter_dups = int(split_args[1]) if len(split_args) > 1 else 1
365        set_scan_parameters = HCI_Hdr(type=1) / HCI_Command_Hdr(opcode=0x200b) / HCI_Cmd_LE_Set_Scan_Parameters(type=1)
366        print('Tx: ' + set_scan_parameters.__repr__())
367        self._hci.send(set_scan_parameters)
368        set_scan_enable = HCI_Hdr(type=1) / HCI_Command_Hdr(opcode=0x200c) / HCI_Cmd_LE_Set_Scan_Enable(
369            enable=enable, filter_dups=filter_dups)
370        print('Tx: ' + set_scan_enable.__repr__())
371        self._hci.send(set_scan_enable)
372
373    def do_scan(self, args):
374        """Arguments: timeout (seconds), max_results Print the scan responses from reachable devices
375
376    """
377        split_args = args.split()
378        scan_time = int(split_args[0]) if len(split_args) > 0 else 0
379        max_responses = int(split_args[1]) if len(split_args) > 1 else 0
380        num_responses = 0
381        inquiry = HCI_Hdr(type='Command') / HCI_Command_Hdr(opcode=0x0401) / HCI_Cmd_Inquiry(
382            length=scan_time, max_responses=max_responses)
383        print('Tx: ' + inquiry.__repr__())
384        self._hci.send(inquiry)
385
386    def do_wait(self, args):
387        """Arguments: time in seconds (float).
388    """
389        sleep_time = float(args.split()[0])
390        time.sleep(sleep_time)
391
392    def do_quit(self, args):
393        """Arguments: None.
394
395    Exits.
396    """
397        self._hci.tell_rx_thread_to_quit()
398        self._hci.close()
399        print('Goodbye.')
400        return True
401
402    def do_help(self, args):
403        """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.
404
405    """
406        if (len(args) == 0):
407            cmd.Cmd.do_help(self, args)
408
409
410def main(argv):
411    if len(argv) != 2:
412        print('Usage: python hci_socket.py [port]')
413        return
414    try:
415        port = int(argv[1])
416    except ValueError:
417        print('Error parsing port.')
418    else:
419        try:
420            hci = HCISocket(port)
421        except socket.error as e:
422            print('Error connecting to socket: %s' % e)
423        except:
424            print('Error creating (check arguments).')
425        else:
426            hci_shell = HCIShell(hci)
427            hci_shell.prompt = '$ '
428            hci_shell.cmdloop('Welcome to the RootCanal HCI Console \n' + 'Type \'help\' for more information.')
429
430
431if __name__ == '__main__':
432    main(sys.argv)
433