1#!/usr/bin/env python3
2#
3# Copyright (C) 2019 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""Stream music through connected device from phone across different
17attenuations."""
18
19from acts_contrib.test_utils.bt.BtInterferenceBaseTest import BtInterferenceBaseTest
20from acts.metrics.loggers.blackbox import BlackboxMetricLogger
21from acts_contrib.test_utils.bt.BtInterferenceBaseTest import get_iperf_results
22from acts_contrib.test_utils.bt.BtInterferenceBaseTest import inject_static_wifi_interference
23from multiprocessing import Process, Queue
24from acts.signals import TestPass
25
26DEFAULT_THDN_THRESHOLD = 0.9
27MAX_ATTENUATION = 95
28TIME_OVERHEAD = 2
29
30
31class BtInterferenceStaticTest(BtInterferenceBaseTest):
32
33    def __init__(self, configs):
34        super().__init__(configs)
35        self.bt_attenuation_range = range(self.attenuation_vector['start'],
36                                          self.attenuation_vector['stop'] + 1,
37                                          self.attenuation_vector['step'])
38        self.iperf_duration = self.audio_params['duration'] + TIME_OVERHEAD
39        test_metrics = [
40            'wifi_chan1_rssi', 'wifi_chan6_rssi', 'wifi_chan11_rssi', 'bt_range'
41        ]
42        for metric in test_metrics:
43            setattr(self, '{}_metric'.format(metric),
44                    BlackboxMetricLogger.for_test_case(metric_name=metric))
45
46    def setup_generated_tests(self):
47        for level in list(
48                self.static_wifi_interference['interference_level'].values()):
49            for channels in self.static_wifi_interference['channels']:
50                arg_set = [(level, channels)]
51                self.generate_tests(
52                    test_logic=self.bt_range_with_static_wifi_interference,
53                    name_func=self.create_test_name,
54                    arg_sets=arg_set)
55
56    def create_test_name(self, level, channels):
57        str_channel_test = ''
58        for i in channels:
59            str_channel_test = str_channel_test + str(i) + "_"
60        test_case_name = ('test_bt_range_with_static_interference_level_{}_'
61                          'channel_{}'.format(level, str_channel_test))
62        return test_case_name
63
64    def bt_range_with_static_wifi_interference(self, level, channels):
65        """Test function to measure bt range under interference.
66
67        Args:
68            interference_level: wifi interference level
69            channels: wifi interference channels
70        """
71        #setup wifi interference by setting the correct attenuator
72        inject_static_wifi_interference(self.wifi_int_pairs, level, channels)
73        # Read interference RSSI
74        self.get_interference_rssi()
75        self.wifi_chan1_rssi_metric.metric_value = self.interference_rssi[0][
76            'rssi']
77        self.wifi_chan6_rssi_metric.metric_value = self.interference_rssi[1][
78            'rssi']
79        self.wifi_chan11_rssi_metric.metric_value = self.interference_rssi[2][
80            'rssi']
81        for atten in self.bt_attenuation_range:
82            # Set attenuation for BT link
83            self.attenuator.set_atten(atten)
84            [
85                rssi_master, pwl_master, rssi_c0_master, rssi_c1_master,
86                txpw_c0_master, txpw_c1_master, bftx_master, divtx_master
87            ], [rssi_slave] = self._get_bt_link_metrics()
88            rssi_primary = rssi_master.get(self.dut.serial, -127)
89            pwl_primary = pwl_master.get(self.dut.serial, -127)
90            rssi_secondary = rssi_slave.get(self.bt_device_controller.serial,
91                                            -127)
92            tag = 'attenuation_{}dB'.format(atten)
93            self.log.info(
94                'BT attenuation set to {} dB and start A2DP streaming'.format(
95                    atten))
96            procs_iperf = []
97            for obj in self.wifi_int_pairs:
98                obj.iperf_server.start()
99                self.log.info('Started IPERF server at port {}'.format(
100                    obj.iperf_server.port))
101                iperf_args = '-i 1 -t {} -p {} -J -R'.format(
102                    self.iperf_duration, obj.iperf_server.port)
103                tag = 'chan_{}'.format(obj.channel)
104                proc_iperf = Process(target=obj.iperf_client.start,
105                                     args=(obj.server_address, iperf_args, tag))
106                procs_iperf.append(proc_iperf)
107
108            #play a2dp streaming and run thdn analysis
109            queue = Queue()
110            proc_bt = Process(target=self.play_and_record_audio,
111                              args=(self.audio_params['duration'], queue))
112            for proc in procs_iperf:
113                proc.start()
114            proc_bt.start()
115            proc_bt.join()
116            for proc in procs_iperf:
117                proc.join()
118            for obj in self.wifi_int_pairs:
119                iperf_throughput = get_iperf_results(obj.iperf_server)
120                self.log.info(
121                    'Throughput for channel {} interference is {} Mbps'.format(
122                        obj.channel, iperf_throughput))
123                obj.iperf_server.stop()
124                self.log.info('Stopped IPERF server at port {}'.format(
125                    obj.iperf_server.port))
126            audio_captured = queue.get()
127            thdns = self.run_thdn_analysis(audio_captured, tag)
128            self.log.info('THDN results are {} at {} dB attenuation'.format(
129                thdns, atten))
130            self.log.info('DUT rssi {} dBm, master tx power level {}, '
131                          'RemoteDevice rssi {} dBm'.format(
132                              rssi_primary, pwl_primary, rssi_secondary))
133            for thdn in thdns:
134                if thdn >= self.audio_params['thdn_threshold']:
135                    self.log.info('Under the WiFi interference condition: '
136                                  'channel 1 RSSI: {} dBm, '
137                                  'channel 6 RSSI: {} dBm'
138                                  'channel 11 RSSI: {} dBm'.format(
139                                      self.interference_rssi[0]['rssi'],
140                                      self.interference_rssi[1]['rssi'],
141                                      self.interference_rssi[2]['rssi']))
142                    raise TestPass(
143                        'Max range for this test is {}, with BT master RSSI at'
144                        ' {} dBm'.format(atten, rssi_primary))
145
146