17Test script to execute BLE connection,run data traffic and calculating RSSI value of the remote BLE device.
20import os
21import logging
22import pandas as pd
23import numpy as np
24import time
25import acts_contrib.test_utils.bt.bt_test_utils as btutils
26import acts_contrib.test_utils.wifi.wifi_performance_test_utils.bokeh_figure as bokeh_figure
27from acts_contrib.test_utils.bt.ble_performance_test_utils import ble_coc_connection
28from acts_contrib.test_utils.bt.ble_performance_test_utils import ble_gatt_disconnection
29from acts_contrib.test_utils.bt.ble_performance_test_utils import start_advertising_and_scanning
30from acts_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
31from acts_contrib.test_utils.bt.bt_test_utils import cleanup_scanners_and_advertisers
32from acts_contrib.test_utils.bt.ble_performance_test_utils import establish_ble_connection
33from acts_contrib.test_utils.bt.bt_constants import l2cap_max_inactivity_delay_after_disconnect
34from acts_contrib.test_utils.bt.ble_performance_test_utils import run_ble_throughput
35from acts_contrib.test_utils.bt.ble_performance_test_utils import read_ble_rssi
36from acts_contrib.test_utils.bt.ble_performance_test_utils import read_ble_scan_rssi
37from acts_contrib.test_utils.bt.bt_test_utils import reset_bluetooth
38from acts_contrib.test_utils.power.PowerBTBaseTest import ramp_attenuation
39from acts_contrib.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test
40from acts.signals import TestPass
41from acts import utils
44MAX_RSSI = 92
47class BleRangeTest(BluetoothBaseTest):
48    active_adv_callback_list = []
49    active_scan_callback_list = []
51    def __init__(self, configs):
52        super().__init__(configs)
53        req_params = ['attenuation_vector', 'system_path_loss']
54        #'attenuation_vector' is a dict containing: start, stop and step of
55        #attenuation changes
56        self.unpack_userparams(req_params)
58    def setup_class(self):
59        super().setup_class()
60        self.client_ad = self.android_devices[0]
61        # The client which is scanning will need location to be enabled in order to
62        # start scan and get scan results.
63        utils.set_location_service(self.client_ad, True)
64        self.server_ad = self.android_devices[1]
65        # Note that some tests required a third device.
66        if hasattr(self, 'attenuators'):
67            self.attenuator = self.attenuators[0]
68            self.attenuator.set_atten(INIT_ATTEN)
69        self.attenuation_range = range(self.attenuation_vector['start'],
70                                       self.attenuation_vector['stop'] + 1,
71                                       self.attenuation_vector['step'])
72        self.log_path = os.path.join(logging.log_path, 'results')
73        os.makedirs(self.log_path, exist_ok=True)
74        # BokehFigure object
75        self.plot = bokeh_figure.BokehFigure(
76            title='{}'.format(self.current_test_name),
77            x_label='Pathloss (dB)',
78            primary_y_label='BLE RSSI (dBm)',
79            secondary_y_label='DUT Tx Power (dBm)',
80            axis_label_size='16pt')
81        if len(self.android_devices) > 2:
82            self.server2_ad = self.android_devices[2]
84        btutils.enable_bqr(self.android_devices)
85        return setup_multiple_devices_for_bt_test(self.android_devices)
87    def teardown_test(self):
88        self.client_ad.droid.bluetoothSocketConnStop()
89        self.server_ad.droid.bluetoothSocketConnStop()
90        if hasattr(self, 'attenuator'):
91            self.attenuator.set_atten(INIT_ATTEN)
92        # Give sufficient time for the physical LE link to be disconnected.
93        time.sleep(l2cap_max_inactivity_delay_after_disconnect)
94        cleanup_scanners_and_advertisers(self.client_ad,
95                                         self.active_scan_callback_list,
96                                         self.server_ad,
97                                         self.active_adv_callback_list)
99    def test_ble_gatt_connection_range(self):
100        """Test GATT connection over LE and read RSSI.
102        Test will establish a gatt connection between a GATT server and GATT
103        client then read the RSSI for each attenuation until the BLE link get disconnect
105        Expected Result:
106        Verify that a connection was established and then disconnected
107        successfully. Verify that the RSSI was read correctly.
109        """
110        attenuation = []
111        ble_rssi = []
112        dut_pwlv = []
113        path_loss = []
114        bluetooth_gatt, gatt_callback, adv_callback, gatt_server = establish_ble_connection(
115            self.client_ad, self.server_ad)
116        for atten in self.attenuation_range:
117            ramp_attenuation(self.attenuator, atten)
118            self.log.info('Set attenuation to %d dB', atten)
119            rssi_primary, pwlv_primary = self.get_ble_rssi_and_pwlv()
120            self.log.info(
121                "Dut BLE RSSI:{} and Pwlv:{} with attenuation:{}".format(
122                    rssi_primary, pwlv_primary, atten))
123            rssi = self.client_ad.droid.gattClientReadRSSI(gatt_server)
124            if type(rssi_primary) != str:
125                attenuation.append(atten)
126                ble_rssi.append(rssi_primary)
127                dut_pwlv.append(pwlv_primary)
128                path_loss.append(atten + self.system_path_loss)
129                df = pd.DataFrame({
130                    'Attenuation': attenuation,
131                    'BLE_RSSI': ble_rssi,
132                    'Dut_PwLv': dut_pwlv,
133                    'Pathloss': path_loss
134                })
135                filepath = os.path.join(
136                    self.log_path, '{}.csv'.format(self.current_test_name))
137            else:
138                self.plot_ble_graph(df)
139                df.to_csv(filepath, encoding='utf-8')
140                raise TestPass('Reached BLE Max Range, BLE Gatt disconnected')
141        ble_gatt_disconnection(self.client_ad, bluetooth_gatt, gatt_callback)
142        self.plot_ble_graph(df)
143        df.to_csv(filepath, encoding='utf-8')
144        self.server_ad.droid.bleStopBleAdvertising(adv_callback)
145        return True
147    def test_ble_coc_throughput_range(self):
148        """Test LE CoC data transfer and read RSSI with each attenuation
150        Test will establish a L2CAP CoC connection between client and server
151        then start BLE date transfer and read the RSSI for each attenuation
152        until the BLE link get disconnect
154        Expected Result:
155        BLE data transfer successful and Read RSSi Value of the server
157        """
158        attenuation = []
159        ble_rssi = []
160        throughput = []
161        dut_pwlv = []
162        path_loss = []
163        self.plot_throughput = bokeh_figure.BokehFigure(
164            title='{}'.format(self.current_test_name),
165            x_label='Pathloss (dB)',
166            primary_y_label='BLE Throughput (bits per sec)',
167            axis_label_size='16pt')
168        status, gatt_callback, gatt_server, bluetooth_gatt, client_conn_id = ble_coc_connection(
169            self.server_ad, self.client_ad)
170        for atten in self.attenuation_range:
171            ramp_attenuation(self.attenuator, atten)
172            self.log.info('Set attenuation to %d dB', atten)
173            datarate = run_ble_throughput(self.client_ad, client_conn_id,
174                                          self.server_ad)
175            rssi_primary, pwlv_primary = self.get_ble_rssi_and_pwlv()
176            self.log.info(
177                "BLE RSSI is:{} dBm and Tx Power:{} with attenuation {} dB with throughput:{}bits per sec"
178                .format(rssi_primary, pwlv_primary, atten, datarate))
179            if type(rssi_primary) != str:
180                attenuation.append(atten)
181                ble_rssi.append(rssi_primary)
182                dut_pwlv.append(pwlv_primary)
183                throughput.append(datarate)
184                path_loss.append(atten + self.system_path_loss)
185                df = pd.DataFrame({
186                    'Attenuation': attenuation,
187                    'BLE_RSSI': ble_rssi,
188                    'Dut_PwLv': dut_pwlv,
189                    'Throughput': throughput,
190                    'Pathloss': path_loss
191                })
192                filepath = os.path.join(
193                    self.log_path, '{}.csv'.format(self.current_test_name))
194                results_file_path = os.path.join(
195                    self.log_path,
196                    '{}_throughput.html'.format(self.current_test_name))
197                self.plot_throughput.add_line(df['Pathloss'],
198                                              df['Throughput'],
199                                              legend='BLE Throughput',
200                                              marker='square_x')
201            else:
202                self.plot_ble_graph(df)
203                self.plot_throughput.generate_figure()
204                bokeh_figure.BokehFigure.save_figures([self.plot_throughput],
205                                                      results_file_path)
206                df.to_csv(filepath, encoding='utf-8')
207                raise TestPass('Reached BLE Max Range, BLE Gatt disconnected')
208        self.plot_ble_graph(df)
209        self.plot_throughput.generate_figure()
210        bokeh_figure.BokehFigure.save_figures([self.plot_throughput],
211                                              results_file_path)
212        df.to_csv(filepath, encoding='utf-8')
213        ble_gatt_disconnection(self.server_ad, bluetooth_gatt, gatt_callback)
214        return True
216    def test_ble_scan_remote_rssi(self):
217        data_points = []
218        for atten in self.attenuation_range:
219            csv_path = os.path.join(
220                self.log_path,
221                '{}_attenuation_{}.csv'.format(self.current_test_name, atten))
222            ramp_attenuation(self.attenuator, atten)
223            self.log.info('Set attenuation to %d dB', atten)
224            adv_callback, scan_callback = start_advertising_and_scanning(
225                self.client_ad, self.server_ad, Legacymode=False)
226            self.active_adv_callback_list.append(adv_callback)
227            self.active_scan_callback_list.append(scan_callback)
228            average_rssi, raw_rssi, timestamp = read_ble_scan_rssi(
229                self.client_ad, scan_callback)
230            self.log.info(
231                "Scanned rssi list of the remote device is :{}".format(
232                    raw_rssi))
233            self.log.info(
234                "BLE RSSI of the remote device is:{} dBm".format(average_rssi))
235            min_rssi = min(raw_rssi)
236            max_rssi = max(raw_rssi)
237            path_loss = atten + self.system_path_loss
238            std_deviation = np.std(raw_rssi)
239            data_point = {
240                'Attenuation': atten,
241                'BLE_RSSI': average_rssi,
242                'Pathloss': path_loss,
243                'Min_RSSI': min_rssi,
244                'Max_RSSI': max_rssi,
245                'Standard_deviation': std_deviation
246            }
247            data_points.append(data_point)
248            df = pd.DataFrame({'timestamp': timestamp, 'raw rssi': raw_rssi})
249            df.to_csv(csv_path, encoding='utf-8', index=False)
250            try:
251                self.server_ad.droid.bleAdvSetStopAdvertisingSet(adv_callback)
252            except Exception as err:
253                self.log.warning(
254                    "Failed to stop advertisement: {}".format(err))
255                reset_bluetooth([self.server_ad])
256            self.client_ad.droid.bleStopBleScan(scan_callback)
257        filepath = os.path.join(
258            self.log_path, '{}_summary.csv'.format(self.current_test_name))
259        ble_df = pd.DataFrame(data_points)
260        ble_df.to_csv(filepath, encoding='utf-8')
261        return True
263    def plot_ble_graph(self, df):
264        """ Plotting BLE RSSI and Throughput with Attenuation.
266        Args:
267            df: Summary of results contains attenuation, BLE_RSSI and Throughput
268        """
269        self.plot.add_line(df['Pathloss'],
270                           df['BLE_RSSI'],
271                           legend='DUT BLE RSSI (dBm)',
272                           marker='circle_x')
273        self.plot.add_line(df['Pathloss'],
274                           df['Dut_PwLv'],
275                           legend='DUT TX Power (dBm)',
276                           marker='hex',
277                           y_axis='secondary')
278        results_file_path = os.path.join(
279            self.log_path, '{}.html'.format(self.current_test_name))
280        self.plot.generate_figure()
281        bokeh_figure.BokehFigure.save_figures([self.plot], results_file_path)
283    def get_ble_rssi_and_pwlv(self):
284        process_data_dict = btutils.get_bt_metric(self.client_ad)
285        rssi_primary = process_data_dict.get('rssi')
286        pwlv_primary = process_data_dict.get('pwlv')
287        rssi_primary = rssi_primary.get(self.client_ad.serial)
288        pwlv_primary = pwlv_primary.get(self.client_ad.serial)
289        return rssi_primary, pwlv_primary