1#!/usr/bin/env python3 2# 3# Copyright 2017 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16""" 17Test script to execute BLE connection,run data traffic and calculating RSSI value of the remote BLE device. 18""" 19 20import os 21import logging 22import pandas as pd 23import numpy as np 24import time 25import acts_contrib.test_utils.bt.bt_test_utils as btutils 26import acts_contrib.test_utils.wifi.wifi_performance_test_utils.bokeh_figure as bokeh_figure 27from acts_contrib.test_utils.bt.ble_performance_test_utils import ble_coc_connection 28from acts_contrib.test_utils.bt.ble_performance_test_utils import ble_gatt_disconnection 29from acts_contrib.test_utils.bt.ble_performance_test_utils import start_advertising_and_scanning 30from acts_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest 31from acts_contrib.test_utils.bt.bt_test_utils import cleanup_scanners_and_advertisers 32from acts_contrib.test_utils.bt.ble_performance_test_utils import establish_ble_connection 33from acts_contrib.test_utils.bt.bt_constants import l2cap_max_inactivity_delay_after_disconnect 34from acts_contrib.test_utils.bt.ble_performance_test_utils import run_ble_throughput 35from acts_contrib.test_utils.bt.ble_performance_test_utils import read_ble_rssi 36from acts_contrib.test_utils.bt.ble_performance_test_utils import read_ble_scan_rssi 37from acts_contrib.test_utils.bt.bt_test_utils import reset_bluetooth 38from acts_contrib.test_utils.power.PowerBTBaseTest import ramp_attenuation 39from acts_contrib.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test 40from acts.signals import TestPass 41from acts import utils 42 43INIT_ATTEN = 0 44MAX_RSSI = 92 45 46 47class BleRangeTest(BluetoothBaseTest): 48 active_adv_callback_list = [] 49 active_scan_callback_list = [] 50 51 def __init__(self, configs): 52 super().__init__(configs) 53 req_params = ['attenuation_vector', 'system_path_loss'] 54 #'attenuation_vector' is a dict containing: start, stop and step of 55 #attenuation changes 56 self.unpack_userparams(req_params) 57 58 def setup_class(self): 59 super().setup_class() 60 self.client_ad = self.android_devices[0] 61 # The client which is scanning will need location to be enabled in order to 62 # start scan and get scan results. 63 utils.set_location_service(self.client_ad, True) 64 self.server_ad = self.android_devices[1] 65 # Note that some tests required a third device. 66 if hasattr(self, 'attenuators'): 67 self.attenuator = self.attenuators[0] 68 self.attenuator.set_atten(INIT_ATTEN) 69 self.attenuation_range = range(self.attenuation_vector['start'], 70 self.attenuation_vector['stop'] + 1, 71 self.attenuation_vector['step']) 72 self.log_path = os.path.join(logging.log_path, 'results') 73 os.makedirs(self.log_path, exist_ok=True) 74 # BokehFigure object 75 self.plot = bokeh_figure.BokehFigure( 76 title='{}'.format(self.current_test_name), 77 x_label='Pathloss (dB)', 78 primary_y_label='BLE RSSI (dBm)', 79 secondary_y_label='DUT Tx Power (dBm)', 80 axis_label_size='16pt') 81 if len(self.android_devices) > 2: 82 self.server2_ad = self.android_devices[2] 83 84 btutils.enable_bqr(self.android_devices) 85 return setup_multiple_devices_for_bt_test(self.android_devices) 86 87 def teardown_test(self): 88 self.client_ad.droid.bluetoothSocketConnStop() 89 self.server_ad.droid.bluetoothSocketConnStop() 90 if hasattr(self, 'attenuator'): 91 self.attenuator.set_atten(INIT_ATTEN) 92 # Give sufficient time for the physical LE link to be disconnected. 93 time.sleep(l2cap_max_inactivity_delay_after_disconnect) 94 cleanup_scanners_and_advertisers(self.client_ad, 95 self.active_scan_callback_list, 96 self.server_ad, 97 self.active_adv_callback_list) 98 99 def test_ble_gatt_connection_range(self): 100 """Test GATT connection over LE and read RSSI. 101 102 Test will establish a gatt connection between a GATT server and GATT 103 client then read the RSSI for each attenuation until the BLE link get disconnect 104 105 Expected Result: 106 Verify that a connection was established and then disconnected 107 successfully. Verify that the RSSI was read correctly. 108 109 """ 110 attenuation = [] 111 ble_rssi = [] 112 dut_pwlv = [] 113 path_loss = [] 114 bluetooth_gatt, gatt_callback, adv_callback, gatt_server = establish_ble_connection( 115 self.client_ad, self.server_ad) 116 for atten in self.attenuation_range: 117 ramp_attenuation(self.attenuator, atten) 118 self.log.info('Set attenuation to %d dB', atten) 119 rssi_primary, pwlv_primary = self.get_ble_rssi_and_pwlv() 120 self.log.info( 121 "Dut BLE RSSI:{} and Pwlv:{} with attenuation:{}".format( 122 rssi_primary, pwlv_primary, atten)) 123 rssi = self.client_ad.droid.gattClientReadRSSI(gatt_server) 124 if type(rssi_primary) != str: 125 attenuation.append(atten) 126 ble_rssi.append(rssi_primary) 127 dut_pwlv.append(pwlv_primary) 128 path_loss.append(atten + self.system_path_loss) 129 df = pd.DataFrame({ 130 'Attenuation': attenuation, 131 'BLE_RSSI': ble_rssi, 132 'Dut_PwLv': dut_pwlv, 133 'Pathloss': path_loss 134 }) 135 filepath = os.path.join( 136 self.log_path, '{}.csv'.format(self.current_test_name)) 137 else: 138 self.plot_ble_graph(df) 139 df.to_csv(filepath, encoding='utf-8') 140 raise TestPass('Reached BLE Max Range, BLE Gatt disconnected') 141 ble_gatt_disconnection(self.client_ad, bluetooth_gatt, gatt_callback) 142 self.plot_ble_graph(df) 143 df.to_csv(filepath, encoding='utf-8') 144 self.server_ad.droid.bleStopBleAdvertising(adv_callback) 145 return True 146 147 def test_ble_coc_throughput_range(self): 148 """Test LE CoC data transfer and read RSSI with each attenuation 149 150 Test will establish a L2CAP CoC connection between client and server 151 then start BLE date transfer and read the RSSI for each attenuation 152 until the BLE link get disconnect 153 154 Expected Result: 155 BLE data transfer successful and Read RSSi Value of the server 156 157 """ 158 attenuation = [] 159 ble_rssi = [] 160 throughput = [] 161 dut_pwlv = [] 162 path_loss = [] 163 self.plot_throughput = bokeh_figure.BokehFigure( 164 title='{}'.format(self.current_test_name), 165 x_label='Pathloss (dB)', 166 primary_y_label='BLE Throughput (bits per sec)', 167 axis_label_size='16pt') 168 status, gatt_callback, gatt_server, bluetooth_gatt, client_conn_id = ble_coc_connection( 169 self.server_ad, self.client_ad) 170 for atten in self.attenuation_range: 171 ramp_attenuation(self.attenuator, atten) 172 self.log.info('Set attenuation to %d dB', atten) 173 datarate = run_ble_throughput(self.client_ad, client_conn_id, 174 self.server_ad) 175 rssi_primary, pwlv_primary = self.get_ble_rssi_and_pwlv() 176 self.log.info( 177 "BLE RSSI is:{} dBm and Tx Power:{} with attenuation {} dB with throughput:{}bits per sec" 178 .format(rssi_primary, pwlv_primary, atten, datarate)) 179 if type(rssi_primary) != str: 180 attenuation.append(atten) 181 ble_rssi.append(rssi_primary) 182 dut_pwlv.append(pwlv_primary) 183 throughput.append(datarate) 184 path_loss.append(atten + self.system_path_loss) 185 df = pd.DataFrame({ 186 'Attenuation': attenuation, 187 'BLE_RSSI': ble_rssi, 188 'Dut_PwLv': dut_pwlv, 189 'Throughput': throughput, 190 'Pathloss': path_loss 191 }) 192 filepath = os.path.join( 193 self.log_path, '{}.csv'.format(self.current_test_name)) 194 results_file_path = os.path.join( 195 self.log_path, 196 '{}_throughput.html'.format(self.current_test_name)) 197 self.plot_throughput.add_line(df['Pathloss'], 198 df['Throughput'], 199 legend='BLE Throughput', 200 marker='square_x') 201 else: 202 self.plot_ble_graph(df) 203 self.plot_throughput.generate_figure() 204 bokeh_figure.BokehFigure.save_figures([self.plot_throughput], 205 results_file_path) 206 df.to_csv(filepath, encoding='utf-8') 207 raise TestPass('Reached BLE Max Range, BLE Gatt disconnected') 208 self.plot_ble_graph(df) 209 self.plot_throughput.generate_figure() 210 bokeh_figure.BokehFigure.save_figures([self.plot_throughput], 211 results_file_path) 212 df.to_csv(filepath, encoding='utf-8') 213 ble_gatt_disconnection(self.server_ad, bluetooth_gatt, gatt_callback) 214 return True 215 216 def test_ble_scan_remote_rssi(self): 217 data_points = [] 218 for atten in self.attenuation_range: 219 csv_path = os.path.join( 220 self.log_path, 221 '{}_attenuation_{}.csv'.format(self.current_test_name, atten)) 222 ramp_attenuation(self.attenuator, atten) 223 self.log.info('Set attenuation to %d dB', atten) 224 adv_callback, scan_callback = start_advertising_and_scanning( 225 self.client_ad, self.server_ad, Legacymode=False) 226 self.active_adv_callback_list.append(adv_callback) 227 self.active_scan_callback_list.append(scan_callback) 228 average_rssi, raw_rssi, timestamp = read_ble_scan_rssi( 229 self.client_ad, scan_callback) 230 self.log.info( 231 "Scanned rssi list of the remote device is :{}".format( 232 raw_rssi)) 233 self.log.info( 234 "BLE RSSI of the remote device is:{} dBm".format(average_rssi)) 235 min_rssi = min(raw_rssi) 236 max_rssi = max(raw_rssi) 237 path_loss = atten + self.system_path_loss 238 std_deviation = np.std(raw_rssi) 239 data_point = { 240 'Attenuation': atten, 241 'BLE_RSSI': average_rssi, 242 'Pathloss': path_loss, 243 'Min_RSSI': min_rssi, 244 'Max_RSSI': max_rssi, 245 'Standard_deviation': std_deviation 246 } 247 data_points.append(data_point) 248 df = pd.DataFrame({'timestamp': timestamp, 'raw rssi': raw_rssi}) 249 df.to_csv(csv_path, encoding='utf-8', index=False) 250 try: 251 self.server_ad.droid.bleAdvSetStopAdvertisingSet(adv_callback) 252 except Exception as err: 253 self.log.warning( 254 "Failed to stop advertisement: {}".format(err)) 255 reset_bluetooth([self.server_ad]) 256 self.client_ad.droid.bleStopBleScan(scan_callback) 257 filepath = os.path.join( 258 self.log_path, '{}_summary.csv'.format(self.current_test_name)) 259 ble_df = pd.DataFrame(data_points) 260 ble_df.to_csv(filepath, encoding='utf-8') 261 return True 262 263 def plot_ble_graph(self, df): 264 """ Plotting BLE RSSI and Throughput with Attenuation. 265 266 Args: 267 df: Summary of results contains attenuation, BLE_RSSI and Throughput 268 """ 269 self.plot.add_line(df['Pathloss'], 270 df['BLE_RSSI'], 271 legend='DUT BLE RSSI (dBm)', 272 marker='circle_x') 273 self.plot.add_line(df['Pathloss'], 274 df['Dut_PwLv'], 275 legend='DUT TX Power (dBm)', 276 marker='hex', 277 y_axis='secondary') 278 results_file_path = os.path.join( 279 self.log_path, '{}.html'.format(self.current_test_name)) 280 self.plot.generate_figure() 281 bokeh_figure.BokehFigure.save_figures([self.plot], results_file_path) 282 283 def get_ble_rssi_and_pwlv(self): 284 process_data_dict = btutils.get_bt_metric(self.client_ad) 285 rssi_primary = process_data_dict.get('rssi') 286 pwlv_primary = process_data_dict.get('pwlv') 287 rssi_primary = rssi_primary.get(self.client_ad.serial) 288 pwlv_primary = pwlv_primary.get(self.client_ad.serial) 289 return rssi_primary, pwlv_primary 290