1import json
2import os
3from typing import Optional, List
4import time
5
6from acts import asserts
7from acts import signals
8import acts_contrib.test_utils.power.cellular.cellular_power_base_test as PWCEL
9from acts_contrib.test_utils.tel import tel_test_utils as telutils
10from acts_contrib.test_utils.power.cellular import modem_logs
11
12# TODO: b/261639867
13class AtUtil():
14    """Util class for sending at command.
15
16    Attributes:
17        dut: AndroidDevice controller object.
18    """
19    ADB_CMD_DISABLE_TXAS = 'am instrument -w -e request at+googtxas=2 -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
20    ADB_CMD_GET_TXAS = 'am instrument -w -e request at+googtxas? -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
21    ADB_MODEM_STATUS = 'cat /sys/bus/platform/devices/cpif/modem_state'
22    ADB_CMD_SET_NV = ('am instrument -w '
23                      '-e request \'at+googsetnv=\"{nv_name}\",{nv_index},\"{nv_value}\"\' '
24                      '-e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"')
25
26    def __init__(self, dut, log) -> None:
27        self.dut = dut
28        self.log = log
29
30    # TODO: to be remove when b/261639867 complete,
31    # and we are using parent method.
32    def send(self, cmd: str, retries: int=5) -> Optional[str]:
33        for _ in range(30):
34            modem_status = self.dut.adb.shell(self.ADB_MODEM_STATUS)
35            self.log.debug(f'Modem status: {modem_status}')
36            if modem_status == 'ONLINE':
37                break
38            time.sleep(1)
39
40        wait_for_device_ready_time = 2
41        for i in range(retries):
42            res = self.dut.adb.shell(cmd)
43            self.log.info(f'cmd sent: {cmd}')
44            self.log.debug(f'response: {res}')
45            if 'SUCCESS' in res and 'OK' in res:
46                return res
47            else:
48                self.log.warning('Fail to execute cmd, retry to send again.')
49                time.sleep(wait_for_device_ready_time)
50        self.log.error(f'Fail to execute cmd: {cmd}')
51        return res
52
53    def lock_band(self):
54        """Lock lte and nr bands.
55
56        LTE bands to be locked include B1, B2, B4.
57        NR bands to belocked include n71, n78, n260.
58        """
59        adb_enable_band_lock_lte = r'am instrument -w -e request at+GOOGSETNV=\"!SAEL3.Manual.Band.Select\ Enb\/\ Dis\",00,\"01\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
60        adb_set_band_lock_bitmap_0 = r'am instrument -w -e request at+GOOGSETNV=\"!SAEL3.Manual.Enabled.RFBands.BitMap\",0,\"0B,00,00,00,00,00,00,00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
61        adb_set_band_lock_bitmap_1 = r'am instrument -w -e request at+GOOGSETNV=\"!SAEL3.Manual.Enabled.RFBands.BitMap\",1,\"00,00,00,00,00,00,00,00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
62        adb_set_band_lock_bitmap_2 = r'am instrument -w -e request at+GOOGSETNV=\"!SAEL3.Manual.Enabled.RFBands.BitMap\",2,\"00,00,00,00,00,00,00,00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
63        adb_set_band_lock_bitmap_3 = r'am instrument -w -e request at+GOOGSETNV=\"!SAEL3.Manual.Enabled.RFBands.BitMap\",3,\"00,00,00,00,00,00,00,00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
64        # enable lte
65        self.send(adb_enable_band_lock_lte)
66        time.sleep(2)
67
68        # lock to B1, B2 and B4
69        self.send(adb_set_band_lock_bitmap_0)
70        time.sleep(2)
71        self.send(adb_set_band_lock_bitmap_1)
72        time.sleep(2)
73        self.send(adb_set_band_lock_bitmap_2)
74        time.sleep(2)
75        self.send(adb_set_band_lock_bitmap_3)
76        time.sleep(2)
77
78        adb_enable_band_lock_nr = r'am instrument -w -e request at+GOOGSETNV=\"!NRRRC.SIM_BASED_BAND_LIST_SUPPORT\",00,\"01\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
79        self.send(adb_enable_band_lock_nr)
80        time.sleep(2)
81        adb_add_band_list_n71 = r'am instrument -w -e request at+GOOGSETNV=\"!NRRRC.SIM_OPERATOR_BAND_LIST\",00,\"47,00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
82        self.send(adb_add_band_list_n71)
83        time.sleep(2)
84        adb_add_band_list_n78 = r'am instrument -w -e request at+GOOGSETNV=\"!NRRRC.SIM_OPERATOR_BAND_LIST\",01,\"4E,00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
85        self.send(adb_add_band_list_n78)
86        time.sleep(2)
87        adb_add_band_list_n260 = r'am instrument -w -e request at+GOOGSETNV=\"!NRRRC.SIM_OPERATOR_BAND_LIST\",02,\"04,01\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
88        self.send(adb_add_band_list_n260)
89        time.sleep(2)
90
91    def disable_lock_band_lte(self):
92        adb_disable_band_lock_lte = r'am instrument -w -e request at+GOOGSETNV=\"!SAEL3.Manual.Band.Select\ Enb\/\ Dis\",0,\"01\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
93
94        # disable band lock lte
95        self.send(adb_disable_band_lock_lte)
96        time.sleep(2)
97
98    def disable_txas(self):
99        res = self.send(self.ADB_CMD_GET_TXAS)
100        if '+GOOGGETTXAS:2' in res:
101            self.log.info('TXAS is in default.')
102            return res
103        cmd = self.ADB_CMD_DISABLE_TXAS
104        response = self.send(cmd)
105        return 'OK' in response
106
107    def get_band_lock_info(self):
108        cmd = r'am instrument -w -e request at+GOOGGETNV=\"!SAEL3.Manual.Enabled.RFBands.BitMap\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
109        res = self.send(cmd)
110        cmd = r'am instrument -w -e request at+GOOGGETNV=\"!SAEL3.Manual.Band.Select\ Enb\/\ Dis\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
111        res += self.send(cmd)
112        cmd = r'am instrument -w -e request at+GOOGGETNV=\"!NRRRC.SIM_BASED_BAND_LIST_SUPPORT\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
113        res += self.send(cmd)
114        cmd = r'am instrument -w -e request at+GOOGGETNV=\"!NRRRC.SIM_OPERATOR_BAND_LIST\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
115        res += self.send(cmd)
116        return res
117
118    def set_nv(self, nv_name, index, value):
119        cmd = self.ADB_CMD_SET_NV.format(
120            nv_name=nv_name,
121            nv_index=index,
122            nv_value=value
123        )
124        res = self.send(cmd)
125        return res
126
127    def get_sim_slot_mapping(self):
128        cmd = r'am instrument -w -e request at+slotmap? -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
129        return self.send(cmd)
130
131    def set_single_psim(self):
132        cmd = r'am instrument -w -e request at+slotmap=1 -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
133        return self.send(cmd)
134
135    def disable_dsp(self):
136        cmd = r'am instrument -w -e request at+googsetnv=\"NASU\.LCPU\.LOG\.SWITCH\",0,\"00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
137        return self.send(cmd)
138
139    def get_dsp_status(self):
140        cmd = r'am instrument -w -e request at+googgetnv=\"NASU\.LCPU\.LOG\.SWITCH\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
141        return self.send(cmd)
142
143    def enable_ims_nr(self):
144        # set !NRCAPA.Gen.VoiceOverNr
145        self.set_nv(
146            nv_name = '!NRCAPA.Gen.VoiceOverNr',
147            index = '0',
148            value = '01'
149        )
150        # set PSS.AIMS.Enable.NRSACONTROL
151        self.set_nv(
152            nv_name = 'PSS.AIMS.Enable.NRSACONTROL',
153            index = '0',
154            value = '00'
155        )
156        # set DS.PSS.AIMS.Enable.NRSACONTROL
157        self.set_nv(
158            nv_name = 'DS.PSS.AIMS.Enable.NRSACONTROL',
159            index = '0',
160            value = '00'
161        )
162        if self.dut.model == 'oriole':
163            # For P21, NR.CONFIG.MODE/DS.NR.CONFIG.MODE
164            self.set_nv(
165                nv_name = 'NR.CONFIG.MODE',
166                index = '0',
167                value = '11'
168            )
169            # set DS.NR.CONFIG.MODE
170            self.set_nv(
171                nv_name = 'DS.NR.CONFIG.MODE',
172                index = '0',
173                value = '11'
174            )
175        else:
176            # For P22, NASU.NR.CONFIG.MODE to 11
177            self.set_nv(
178                nv_name = 'NASU.NR.CONFIG.MODE',
179                index = '0',
180                value = '11'
181            )
182
183class PowerCellularPresetLabBaseTest(PWCEL.PowerCellularLabBaseTest):
184    # Key for ODPM report
185    ODPM_ENERGY_TABLE_NAME = 'PowerStats HAL 2.0 energy meter'
186    ODPM_MODEM_CHANNEL_NAME = '[VSYS_PWR_MODEM]:Modem'
187
188    # Pass fail threshold lower bound
189    THRESHOLD_TOLERANCE_LOWER_BOUND_DEFAULT = 0.3
190
191    # Key for custom_property in Sponge
192    CUSTOM_PROP_KEY_BUILD_ID = 'build_id'
193    CUSTOM_PROP_KEY_INCR_BUILD_ID = 'incremental_build_id'
194    CUSTOM_PROP_KEY_BUILD_TYPE = 'build_type'
195    CUSTOM_PROP_KEY_SYSTEM_POWER = 'system_power'
196    CUSTOM_PROP_KEY_MODEM_BASEBAND = 'baseband'
197    CUSTOM_PROP_KEY_MODEM_ODPM_POWER= 'modem_odpm_power'
198    CUSTOM_PROP_KEY_DEVICE_NAME = 'device'
199    CUSTOM_PROP_KEY_DEVICE_BUILD_PHASE = 'device_build_phase'
200    CUSTOM_PROP_KEY_MODEM_KIBBLE_POWER = 'modem_kibble_power'
201    CUSTOM_PROP_KEY_TEST_NAME = 'test_name'
202    CUSTOM_PROP_KEY_MODEM_KIBBLE_WO_PCIE_POWER = 'modem_kibble_power_wo_pcie'
203    CUSTOM_PROP_KEY_MODEM_KIBBLE_PCIE_POWER = 'modem_kibble_pcie_power'
204    CUSTOM_PROP_KEY_RFFE_POWER = 'rffe_power'
205    CUSTOM_PROP_KEY_MMWAVE_POWER = 'mmwave_power'
206    CUSTOM_PROP_KEY_CURRENT_REFERENCE_TARGET = 'reference_target'
207    # kibble report
208    KIBBLE_SYSTEM_RECORD_NAME = '- name: default_device.C10_EVT_1_1.Monsoon:mA'
209    MODEM_PCIE_RAIL_NAME_LIST = [
210        'PP1800_L2C_PCIEG3',
211        'PP1200_L9C_PCIE',
212        'PP0850_L8C_PCIE'
213    ]
214
215    MODEM_RFFE_RAIL_NAME = 'VSYS_PWR_RFFE'
216
217    MODEM_POWER_RAIL_NAME = 'VSYS_PWR_MODEM'
218
219    MODEM_POWER_RAIL_WO_PCIE_NAME = 'VSYS_PWR_MODEM_W_O_PCIE'
220
221    WEARABLE_POWER_RAIL = 'LTE_DC'
222
223    WEARABLE_SOC_MODEM_RAIL = 'SOC_MODEM_USBHS'
224
225    MODEM_MMWAVE_RAIL_NAME = 'VSYS_PWR_MMWAVE'
226
227    MONSOON_RAIL_NAME = 'Monsoon:mW'
228
229    # params key
230    MONSOON_VOLTAGE_KEY = 'mon_voltage'
231
232    MDSTEST_APP_APK_NAME = 'mdstest.apk'
233
234    ADB_CMD_ENABLE_ALWAYS_ON_LOGGING = (
235        'am broadcast -n com.android.pixellogger/.receiver.AlwaysOnLoggingReceiver '
236        '-a com.android.pixellogger.service.logging.LoggingService.ACTION_CONFIGURE_ALWAYS_ON_LOGGING '
237        '-e intent_key_enable "true" '
238        '-e intent_key_config "Lassen\ default" '
239        '--ei intent_key_max_log_size_mb 100 '
240        '--ei intent_key_max_number_of_files 20'
241    )
242    ADB_CMD_DISABLE_ALWAYS_ON_LOGGING = (
243        'am start-foreground-service -a '
244        'com.android.pixellogger.service.logging.LoggingService.ACTION_STOP_LOGGING')
245
246    ADB_CMD_TOGGLE_MODEM_LOG = 'setprop persist.vendor.sys.modem.logging.enable {state}'
247
248    _ADB_GET_ACTIVE_NETWORK = ('dumpsys connectivity | '
249                             'grep \'Active default network\'')
250
251    def __init__(self, controllers):
252        super().__init__(controllers)
253        self.retryable_exceptions = signals.TestFailure
254        self.power_rails = {}
255        self.pcie_power = 0
256        self.rffe_power = 0
257        self.mmwave_power = 0
258        self.modem_power = 0
259        self.monsoon_power = 0
260        self.kibble_error_range = 2
261        self.system_power = 0
262        self.odpm_power = 0
263
264    def setup_class(self):
265        super().setup_class()
266
267        # preset callbox
268        is_fr2 = 'Fr2' in self.TAG
269        self.cellular_simulator.switch_HCCU_settings(is_fr2=is_fr2)
270
271        self.at_util = AtUtil(self.cellular_dut.ad, self.log)
272
273        # preset UE.
274        self.log.info(f'Bug report mode: {self.bug_report}')
275        self.toggle_modem_log(False)
276        self.log.info('Installing mdstest app.')
277        self.install_apk()
278
279        self.unpack_userparams(is_mdstest_supported=True)
280        self.log.info(f'Supports mdstest: {self.is_mdstest_supported}')
281        if self.is_mdstest_supported:
282            # UE preset
283            self.log.info('Disable antenna switch.')
284            self.at_util.disable_txas()
285            time.sleep(10)
286
287            # set device to be data centric
288            nv_result = self.at_util.set_nv(
289                nv_name = '!SAEL3.SAE_UE_OPERATION_MODE',
290                index = '0',
291                value = '03'
292            )
293            self.log.info(nv_result)
294
295            self.at_util.lock_band()
296            self.log.info('Band lock info: \n%s',self.at_util.get_band_lock_info())
297
298            self.at_util.set_single_psim()
299
300        self.unpack_userparams(is_wifi_only_device=False)
301
302        # extract log only flag
303        self.unpack_userparams(collect_log_only=False)
304        # get sdim type
305        self.unpack_userparams(has_3gpp_sim=True)
306        # extract time to take log after test
307        self.unpack_userparams(post_test_log_duration=30)
308
309        # toggle on/off APM for all devices
310        self.log.info('Toggle APM on/off for all devices.')
311        for ad in self.android_devices:
312            telutils.toggle_airplane_mode_by_adb(self.log, ad, False)
313            time.sleep(2)
314            telutils.toggle_airplane_mode_by_adb(self.log, ad, True)
315            time.sleep(2)
316
317        # clear modem logs
318        modem_logs.clear_modem_logging(self.cellular_dut.ad)
319
320    def collect_power_data_and_validate(self):
321        cells_status_before = sorted(self.cellular_simulator.get_all_cell_status())
322        self.log.info('UXM cell status before collect power: %s', cells_status_before)
323
324        super().collect_power_data()
325        cells_status_after = sorted(self.cellular_simulator.get_all_cell_status())
326        self.log.info('UXM cell status after collect power: %s', cells_status_after)
327
328        # power measurement results
329        odpm_power_results = self.get_odpm_values()
330        self.odpm_power = odpm_power_results.get(
331            self.ODPM_MODEM_CHANNEL_NAME.lower(), 0)
332        if hasattr(self, 'bitses'):
333            self.parse_power_rails_csv()
334
335        asserts.assert_true(cells_status_before == cells_status_after,
336            'Cell status before {} and after {} the test run are not the same.'.format(
337                cells_status_before, cells_status_after
338            ))
339        self.threshold_check()
340
341    def setup_test(self):
342        try:
343            if self.collect_log_only:
344                self.log.info('Collect log only mode on.')
345                # set log mask
346                modem_logs.set_modem_log_profle(self.cellular_dut.ad, modem_logs.ModemLogProfile.LASSEN_TCP_DSP)
347                # start log
348                modem_logs.start_modem_logging(self.cellular_dut.ad)
349            modem_log_dir = os.path.join(self.root_output_path, 'modem_log')
350            os.makedirs(modem_log_dir, exist_ok=True)
351            self.modem_log_path = os.path.join(modem_log_dir, self.test_name)
352            os.makedirs(self.modem_log_path, exist_ok=True)
353            super().setup_test()
354        except BrokenPipeError:
355            self.log.info('TA crashed test need retry.')
356            self.need_retry = True
357            self.cellular_simulator.recovery_ta()
358            self.cellular_simulator.socket_connect()
359            raise signals.TestFailure('TA crashed mid test, retry needed.')
360
361    def toggle_modem_log(self, new_state: bool, timeout: int=30):
362        new_state = str(new_state).lower()
363        current_state = self.cellular_dut.ad.adb.shell('getprop persist.vendor.sys.modem.logging.enable')
364        cmd = self.ADB_CMD_TOGGLE_MODEM_LOG.format(state=new_state)
365        if new_state != current_state:
366            self.cellular_dut.ad.adb.shell(cmd)
367            for _ in range(timeout):
368                self.log.debug(f'Wait for modem logging status to be {new_state}.')
369                time.sleep(1)
370                current_state = self.cellular_dut.ad.adb.shell('getprop persist.vendor.sys.modem.logging.enable')
371                if new_state == current_state:
372                    self.log.info(f'Always-on modem logging status is {new_state}.')
373                    return
374            raise RuntimeError(f'Fail to set modem logging to {new_state}.')
375
376    def collect_modem_log(self, out_path, duration: int=30):
377        # set log mask
378        modem_logs.set_modem_log_profle(self.cellular_dut.ad, modem_logs.ModemLogProfile.LASSEN_TCP_DSP)
379
380        # start log
381        modem_logs.start_modem_logging(self.cellular_dut.ad)
382        time.sleep(duration)
383        # stop log
384        modem_logs.stop_modem_logging(self.cellular_dut.ad)
385        try:
386            # pull log
387            modem_logs.pull_logs(self.cellular_dut.ad, out_path)
388        finally:
389            # clear log
390            modem_logs.clear_modem_logging(self.cellular_dut.ad)
391
392    def install_apk(self):
393        sleep_time = 3
394        for file in self.custom_files:
395            if self.MDSTEST_APP_APK_NAME in file:
396                if not self.cellular_dut.ad.is_apk_installed("com.google.mdstest"):
397                    self.cellular_dut.ad.adb.install("-r -g %s" % file, timeout=300, ignore_status=True)
398        time.sleep(sleep_time)
399        if self.cellular_dut.ad.is_apk_installed("com.google.mdstest"):
400            self.log.info('mdstest installed.')
401        else:
402            self.log.warning('fail to install mdstest.')
403
404    def get_odpm_values(self):
405        """Get power measure from ODPM.
406
407        Parsing energy table in ODPM file
408        and convert to.
409        Returns:
410            odpm_power_results: a dictionary
411                has key as channel name,
412                and value as power measurement of that channel.
413        """
414        self.log.info('Start calculating power by channel from ODPM report.')
415        odpm_power_results = {}
416
417        # device before P21 don't have ODPM reading
418        if not self.odpm_folder:
419            return odpm_power_results
420
421        # getting ODPM modem power value
422        odpm_file_name = '{}.{}.dumpsys_odpm_{}.txt'.format(
423            self.__class__.__name__,
424            self.current_test_name,
425            'after')
426        odpm_file_path = os.path.join(self.odpm_folder, odpm_file_name)
427        if os.path.exists(odpm_file_path):
428            elapsed_time = None
429            with open(odpm_file_path, 'r') as f:
430                # find energy table in ODPM report
431                for line in f:
432                    if self.ODPM_ENERGY_TABLE_NAME in line:
433                        break
434
435                # get elapse time 2 adb ODPM cmd (mS)
436                elapsed_time_str = f.readline()
437                elapsed_time = float(elapsed_time_str
438                                        .split(':')[1]
439                                        .strip()
440                                        .split(' ')[0])
441                self.log.info(elapsed_time_str)
442
443                # skip column name row
444                next(f)
445
446                # get power of different channel from odpm report
447                for line in f:
448                    if 'End' in line:
449                        break
450                    else:
451                        # parse columns
452                        # example result of line.strip().split()
453                        # ['[VSYS_PWR_DISPLAY]:Display', '1039108.42', 'mWs', '(', '344.69)']
454                        channel, _, _, _, delta_str = line.strip().split()
455                        channel = channel.lower()
456                        delta = float(delta_str[:-2].strip())
457
458                        # calculate OPDM power
459                        # delta is a different in cumulative energy
460                        # between 2 adb ODPM cmd
461                        elapsed_time_s = elapsed_time / 1000
462                        power = delta / elapsed_time_s
463                        odpm_power_results[channel] = power
464                        self.log.info(
465                            channel + ' ' + str(power) + ' mW'
466                        )
467        return odpm_power_results
468
469    def _is_any_substring(self, longer_word: str, word_list: List[str]) -> bool:
470        """Check if any word in word list a substring of a longer word."""
471        return any(w in longer_word for w in word_list)
472
473    def parse_power_rails_csv(self):
474        kibble_dir = os.path.join(self.root_output_path, 'Kibble')
475        kibble_json_path = None
476        if os.path.exists(kibble_dir):
477            for f in os.listdir(kibble_dir):
478                if self.test_name in f and '.json' in f:
479                    kibble_json_path = os.path.join(kibble_dir, f)
480                    self.log.info('Kibble json file path: ' + kibble_json_path)
481                    break
482
483        self.log.info('Parsing power rails from csv.')
484        if kibble_json_path:
485            with open(kibble_json_path, 'r') as f:
486                rails_data_json = json.load(f)
487            if rails_data_json:
488                for record in rails_data_json:
489                    unit = record['unit']
490                    if unit != 'mW':
491                        continue
492                    railname = record['name']
493                    power = record['avg']
494                    # parse pcie power
495                    if self._is_any_substring(railname, self.MODEM_PCIE_RAIL_NAME_LIST):
496                        self.log.info('%s: %f',railname, power)
497                        self.pcie_power += power
498                    elif self.MODEM_POWER_RAIL_NAME in railname:
499                        self.log.info('%s: %f',railname, power)
500                        self.modem_power = power
501                    elif self.MODEM_RFFE_RAIL_NAME in railname:
502                        self.log.info('%s: %f',railname, power)
503                        self.rffe_power = power
504                    elif self.MODEM_MMWAVE_RAIL_NAME in railname:
505                        self.log.info('%s: %f',railname, power)
506                        self.mmwave_power = power
507                    elif self.MONSOON_RAIL_NAME in railname:
508                        self.log.info('%s: %f',railname, power)
509                        self.monsoon_power = power
510                    elif self.WEARABLE_POWER_RAIL in railname or self.WEARABLE_SOC_MODEM_RAIL in railname:
511                        self.log.info('%s: %f',railname, power)
512                        self.modem_power += power
513        if self.modem_power:
514            self.power_results[self.test_name] = self.modem_power
515
516    def sponge_upload(self):
517        """Upload result to sponge as custom field."""
518        # test name
519        test_name_arr = self.current_test_name.split('_')
520        test_name_for_sponge = ''.join(
521            word[0].upper() + word[1:].lower()
522                for word in test_name_arr
523                    if word not in ('preset', 'test')
524        )
525
526        # build info
527        build_info = self.cellular_dut.ad.build_info
528        build_id = build_info.get('build_id', 'Unknown')
529        incr_build_id = build_info.get('incremental_build_id', 'Unknown')
530        modem_base_band = self.cellular_dut.ad.adb.getprop(
531            'gsm.version.baseband')
532        build_type = build_info.get('build_type', 'Unknown')
533
534        # device info
535        device_info = self.cellular_dut.ad.device_info
536        device_name = device_info.get('model', 'Unknown')
537        device_build_phase = self.cellular_dut.ad.adb.getprop(
538            'ro.boot.hardware.revision'
539        )
540
541        # if kibbles are using, get power from kibble
542        modem_kibble_power_wo_pcie = 0
543        if hasattr(self, 'bitses'):
544            modem_kibble_power_wo_pcie = self.modem_power - self.pcie_power
545            self.system_power = self.monsoon_power
546        else:
547            self.system_power = self.power_results.get(self.test_name, 0)
548
549        # record reference target, if it exists
550        self.reference_target = ''
551        if self.threshold and self.test_name in self.threshold:
552            self.reference_target = self.threshold[self.test_name]
553
554        self.record_data({
555            'Test Name': self.test_name,
556            'sponge_properties': {
557                self.CUSTOM_PROP_KEY_SYSTEM_POWER: self.system_power,
558                self.CUSTOM_PROP_KEY_BUILD_ID: build_id,
559                self.CUSTOM_PROP_KEY_INCR_BUILD_ID: incr_build_id,
560                self.CUSTOM_PROP_KEY_MODEM_BASEBAND: modem_base_band,
561                self.CUSTOM_PROP_KEY_BUILD_TYPE: build_type,
562                self.CUSTOM_PROP_KEY_MODEM_ODPM_POWER: self.odpm_power,
563                self.CUSTOM_PROP_KEY_DEVICE_NAME: device_name,
564                self.CUSTOM_PROP_KEY_DEVICE_BUILD_PHASE: device_build_phase,
565                self.CUSTOM_PROP_KEY_MODEM_KIBBLE_POWER: self.modem_power,
566                self.CUSTOM_PROP_KEY_TEST_NAME: test_name_for_sponge,
567                self.CUSTOM_PROP_KEY_MODEM_KIBBLE_WO_PCIE_POWER: modem_kibble_power_wo_pcie,
568                self.CUSTOM_PROP_KEY_MODEM_KIBBLE_PCIE_POWER: self.pcie_power,
569                self.CUSTOM_PROP_KEY_RFFE_POWER: self.rffe_power,
570                self.CUSTOM_PROP_KEY_MMWAVE_POWER: self.mmwave_power,
571                self.CUSTOM_PROP_KEY_CURRENT_REFERENCE_TARGET: self.reference_target
572            },
573        })
574
575    def threshold_check(self):
576        """Check the test result and decide if it passed or failed.
577
578        The threshold is provided in the config file. In this class, result is
579        current in mA.
580        """
581
582        if not self.threshold or self.test_name not in self.threshold:
583            self.log.error("No threshold is provided for the test '{}' in "
584                           "the configuration file.".format(self.test_name))
585            return
586
587        if not hasattr(self, 'bitses'):
588            self.log.error("No bitses attribute found, threshold cannot be"
589                           "checked against system power.")
590            return
591
592        average_current = self.modem_power
593        if ('modem_rail' in self.threshold.keys() and self.threshold['modem_rail'] == self.MODEM_POWER_RAIL_WO_PCIE_NAME):
594            average_current = average_current - self.pcie_power
595        current_threshold = self.threshold[self.test_name]
596
597        acceptable_upper_difference = max(
598            self.threshold[self.test_name] * self.pass_fail_tolerance,
599            self.kibble_error_range
600        )
601        self.log.info('acceptable upper difference' + str(acceptable_upper_difference))
602
603        self.unpack_userparams(pass_fail_tolerance_lower_bound=self.THRESHOLD_TOLERANCE_LOWER_BOUND_DEFAULT)
604        acceptable_lower_difference = max(
605            self.threshold[self.test_name] * self.pass_fail_tolerance_lower_bound,
606            self.kibble_error_range)
607        self.log.info('acceptable lower diff ' + str(acceptable_lower_difference))
608
609        if average_current:
610            asserts.assert_true(
611                average_current < current_threshold + acceptable_upper_difference,
612                'Measured average current in [{}]: {:.2f}mW, which is '
613                'out of the acceptable upper range {:.2f}+{:.2f}mW'.format(
614                    self.test_name, average_current, current_threshold,
615                    acceptable_upper_difference))
616
617            asserts.assert_true(
618                average_current > current_threshold - acceptable_lower_difference,
619                'Measured average current in [{}]: {:.2f}mW, which is '
620                'out of the acceptable lower range {:.2f}-{:.2f}mW'.format(
621                    self.test_name, average_current, current_threshold,
622                    acceptable_lower_difference))
623
624            asserts.explicit_pass(
625                'Measured average current in [{}]: {:.2f}mW, which is '
626                'within the acceptable range of {:.2f}-{:.2f} and {:.2f}+{:.2f}'.format(
627                    self.test_name, average_current, current_threshold,
628                    acceptable_lower_difference, current_threshold, acceptable_upper_difference))
629        else:
630            asserts.fail(
631                'Something happened, measurement is not complete, test failed')
632
633    def _get_device_network(self) -> str:
634        """Get active network on device.
635
636        Returns:
637        Information of active network in string.
638        """
639        return self.dut.adb.shell(
640            self._ADB_GET_ACTIVE_NETWORK)
641
642    def teardown_test(self):
643        if self.collect_log_only:
644            try:
645                # stop log
646                modem_logs.stop_modem_logging(self.cellular_dut.ad)
647                # pull log
648                modem_logs.pull_logs(self.cellular_dut.ad, self.modem_log_path)
649            finally:
650                # clear log
651                modem_logs.clear_modem_logging(self.cellular_dut.ad)
652        else:
653            if self.is_mdstest_supported:
654                try:
655                    self.collect_modem_log(self.modem_log_path, self.post_test_log_duration)
656                except RuntimeError:
657                    self.log.warning('Fail to collect log before test end.')
658        self.log.info('===>Before test end info.<====')
659        cells_status = self.cellular_simulator.get_all_cell_status()
660        self.log.info('UXM cell status: %s', cells_status)
661        active_network = self._get_device_network()
662        self.log.info('Device network: %s', active_network)
663        super().teardown_test()
664        # restore device to ready state for next test
665        if not self.is_wifi_only_device:
666            self.log.info('Enable mobile data.')
667            self.cellular_dut.ad.adb.shell('svc data enable')
668        self.cellular_simulator.detach()
669        self.cellular_dut.toggle_airplane_mode(True)
670
671        if self.is_mdstest_supported:
672            self.at_util.disable_dsp()
673            self.log.info('Band lock info: \n%s', self.at_util.get_band_lock_info())
674            self.log.info('Sim slot map: \n%s', self.at_util.get_sim_slot_mapping())
675            self.log.info('DSP status: \n%s', self.at_util.get_dsp_status)
676
677        # processing result
678        self.sponge_upload()
679