1#!/usr/bin/env python3
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17Ble libraries
18"""
19
20from acts_contrib.test_utils.bt.bt_constants import ble_advertise_settings_modes
21from acts_contrib.test_utils.bt.bt_constants import ble_advertise_settings_tx_powers
22from acts_contrib.test_utils.bt.bt_constants import ble_scan_settings_modes
23from acts_contrib.test_utils.bt.bt_constants import small_timeout
24from acts_contrib.test_utils.bt.bt_constants import adv_fail
25from acts_contrib.test_utils.bt.bt_constants import adv_succ
26from acts_contrib.test_utils.bt.bt_constants import advertising_set_on_own_address_read
27from acts_contrib.test_utils.bt.bt_constants import advertising_set_started
28from acts_contrib.test_utils.bt.bt_test_utils import generate_ble_advertise_objects
29
30import time
31
32
33class BleLib():
34    def __init__(self, log, dut):
35        self.advertisement_list = []
36        self.dut = dut
37        self.log = log
38        self.default_timeout = 5
39        self.set_advertisement_list = []
40        self.generic_uuid = "0000{}-0000-1000-8000-00805f9b34fb"
41
42    def _verify_ble_adv_started(self, advertise_callback):
43        """Helper for verifying if an advertisment started or not"""
44        regex = "({}|{})".format(adv_succ.format(advertise_callback),
45                                 adv_fail.format(advertise_callback))
46        try:
47            event = self.dut.ed.pop_events(regex, 5, small_timeout)
48        except Empty:
49            self.dut.log.error("Failed to get success or failed event.")
50            return
51        if event[0]["name"] == adv_succ.format(advertise_callback):
52            self.dut.log.info("Advertisement started successfully.")
53            return True
54        else:
55            self.dut.log.info("Advertisement failed to start.")
56            return False
57
58    def start_generic_connectable_advertisement(self, line):
59        """Start a connectable LE advertisement"""
60        scan_response = None
61        if line:
62            scan_response = bool(line)
63        self.dut.droid.bleSetAdvertiseSettingsAdvertiseMode(
64            ble_advertise_settings_modes['low_latency'])
65        self.dut.droid.bleSetAdvertiseSettingsIsConnectable(True)
66        advertise_callback, advertise_data, advertise_settings = (
67            generate_ble_advertise_objects(self.dut.droid))
68        if scan_response:
69            self.dut.droid.bleStartBleAdvertisingWithScanResponse(
70                advertise_callback, advertise_data, advertise_settings,
71                advertise_data)
72        else:
73            self.dut.droid.bleStartBleAdvertising(advertise_callback,
74                                                  advertise_data,
75                                                  advertise_settings)
76        if self._verify_ble_adv_started(advertise_callback):
77            self.log.info(
78                "Tracking Callback ID: {}".format(advertise_callback))
79            self.advertisement_list.append(advertise_callback)
80            self.log.info(self.advertisement_list)
81
82    def start_connectable_advertisement_set(self, line):
83        """Start Connectable Advertisement Set"""
84        adv_callback = self.dut.droid.bleAdvSetGenCallback()
85        adv_data = {
86            "includeDeviceName": True,
87        }
88        self.dut.droid.bleAdvSetStartAdvertisingSet(
89            {
90                "connectable": True,
91                "legacyMode": False,
92                "primaryPhy": "PHY_LE_1M",
93                "secondaryPhy": "PHY_LE_1M",
94                "interval": 320
95            }, adv_data, None, None, None, 0, 0, adv_callback)
96        evt = self.dut.ed.pop_event(
97            advertising_set_started.format(adv_callback), self.default_timeout)
98        set_id = evt['data']['setId']
99        self.log.error("did not receive the set started event!")
100        evt = self.dut.ed.pop_event(
101            advertising_set_on_own_address_read.format(set_id),
102            self.default_timeout)
103        address = evt['data']['address']
104        self.log.info("Advertiser address is: {}".format(str(address)))
105        self.set_advertisement_list.append(adv_callback)
106
107    def stop_all_advertisement_set(self, line):
108        """Stop all Advertisement Sets"""
109        for adv in self.set_advertisement_list:
110            try:
111                self.dut.droid.bleAdvSetStopAdvertisingSet(adv)
112            except Exception as err:
113                self.log.error("Failed to stop advertisement: {}".format(err))
114
115    def adv_add_service_uuid_list(self, line):
116        """Add service UUID to the LE advertisement inputs:
117         [uuid1 uuid2 ... uuidN]"""
118        uuids = line.split()
119        uuid_list = []
120        for uuid in uuids:
121            if len(uuid) == 4:
122                uuid = self.generic_uuid.format(line)
123            uuid_list.append(uuid)
124        self.dut.droid.bleSetAdvertiseDataSetServiceUuids(uuid_list)
125
126    def adv_data_include_local_name(self, is_included):
127        """Include local name in the advertisement. inputs: [true|false]"""
128        self.dut.droid.bleSetAdvertiseDataIncludeDeviceName(bool(is_included))
129
130    def adv_data_include_tx_power_level(self, is_included):
131        """Include tx power level in the advertisement. inputs: [true|false]"""
132        self.dut.droid.bleSetAdvertiseDataIncludeTxPowerLevel(
133            bool(is_included))
134
135    def adv_data_add_manufacturer_data(self, line):
136        """Include manufacturer id and data to the advertisment:
137        [id data1 data2 ... dataN]"""
138        info = line.split()
139        manu_id = int(info[0])
140        manu_data = []
141        for data in info[1:]:
142            manu_data.append(int(data))
143        self.dut.droid.bleAddAdvertiseDataManufacturerId(manu_id, manu_data)
144
145    def start_generic_nonconnectable_advertisement(self, line):
146        """Start a nonconnectable LE advertisement"""
147        self.dut.droid.bleSetAdvertiseSettingsAdvertiseMode(
148            ble_advertise_settings_modes['low_latency'])
149        self.dut.droid.bleSetAdvertiseSettingsIsConnectable(False)
150        advertise_callback, advertise_data, advertise_settings = (
151            generate_ble_advertise_objects(self.dut.droid))
152        self.dut.droid.bleStartBleAdvertising(advertise_callback,
153                                              advertise_data,
154                                              advertise_settings)
155        if self._verify_ble_adv_started(advertise_callback):
156            self.log.info(
157                "Tracking Callback ID: {}".format(advertise_callback))
158            self.advertisement_list.append(advertise_callback)
159            self.log.info(self.advertisement_list)
160
161    def stop_all_advertisements(self, line):
162        """Stop all LE advertisements"""
163        for callback_id in self.advertisement_list:
164            self.log.info("Stopping Advertisement {}".format(callback_id))
165            self.dut.droid.bleStopBleAdvertising(callback_id)
166            time.sleep(1)
167        self.advertisement_list = []
168
169    def ble_stop_advertisement(self, callback_id):
170        """Stop an LE advertisement"""
171        if not callback_id:
172            self.log.info("Need a callback ID")
173            return
174        callback_id = int(callback_id)
175        if callback_id not in self.advertisement_list:
176            self.log.info("Callback not in list of advertisements.")
177            return
178        self.dut.droid.bleStopBleAdvertising(callback_id)
179        self.advertisement_list.remove(callback_id)
180
181    def start_max_advertisements(self, line):
182        scan_response = None
183        if line:
184            scan_response = bool(line)
185        while (True):
186            try:
187                self.dut.droid.bleSetAdvertiseSettingsAdvertiseMode(
188                    ble_advertise_settings_modes['low_latency'])
189                self.dut.droid.bleSetAdvertiseSettingsIsConnectable(True)
190                advertise_callback, advertise_data, advertise_settings = (
191                    generate_ble_advertise_objects(self.dut.droid))
192                if scan_response:
193                    self.dut.droid.bleStartBleAdvertisingWithScanResponse(
194                        advertise_callback, advertise_data, advertise_settings,
195                        advertise_data)
196                else:
197                    self.dut.droid.bleStartBleAdvertising(
198                        advertise_callback, advertise_data, advertise_settings)
199                if self._verify_ble_adv_started(advertise_callback):
200                    self.log.info(
201                        "Tracking Callback ID: {}".format(advertise_callback))
202                    self.advertisement_list.append(advertise_callback)
203                    self.log.info(self.advertisement_list)
204                else:
205                    self.log.info("Advertisements active: {}".format(
206                        len(self.advertisement_list)))
207                    return False
208            except Exception as err:
209                self.log.info("Advertisements active: {}".format(
210                    len(self.advertisement_list)))
211                return True
212