1#!/usr/bin/env python3
2#
3#   Copyright 2021 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import time
18from enum import Enum
19
20from acts.controllers.rohdeschwarz_lib import cmx500
21from acts.controllers.rohdeschwarz_lib.cmx500 import LteBandwidth
22from acts.controllers.rohdeschwarz_lib.cmx500 import LteState
23from acts.controllers import cellular_simulator as cc
24from acts.controllers.cellular_lib import LteSimulation
25
26CMX_TM_MAPPING = {
27    LteSimulation.TransmissionMode.TM1: cmx500.TransmissionModes.TM1,
28    LteSimulation.TransmissionMode.TM2: cmx500.TransmissionModes.TM2,
29    LteSimulation.TransmissionMode.TM3: cmx500.TransmissionModes.TM3,
30    LteSimulation.TransmissionMode.TM4: cmx500.TransmissionModes.TM4,
31    LteSimulation.TransmissionMode.TM7: cmx500.TransmissionModes.TM7,
32    LteSimulation.TransmissionMode.TM8: cmx500.TransmissionModes.TM8,
33    LteSimulation.TransmissionMode.TM9: cmx500.TransmissionModes.TM9,
34}
35
36CMX_SCH_MAPPING = {
37    LteSimulation.SchedulingMode.STATIC: cmx500.SchedulingMode.USERDEFINEDCH
38}
39
40CMX_MIMO_MAPPING = {
41    LteSimulation.MimoMode.MIMO_1x1: cmx500.MimoModes.MIMO1x1,
42    LteSimulation.MimoMode.MIMO_2x2: cmx500.MimoModes.MIMO2x2,
43    LteSimulation.MimoMode.MIMO_4x4: cmx500.MimoModes.MIMO4x4,
44}
45
46
47class ConfigurationMode(Enum):
48    Power = "Power"
49
50
51class CMX500CellularSimulator(cc.AbstractCellularSimulator):
52    """ A cellular simulator for telephony simulations based on the CMX 500
53    controller. """
54
55    # The maximum power that the equipment is able to transmit
56    MAX_DL_POWER = -25
57
58    def __init__(self,
59                 ip_address,
60                 port='5025',
61                 config_mode=None):
62        """ Initializes the cellular simulator.
63
64        Args:
65            ip_address: the ip address of the CMX500
66            port: the port number for the CMX500 controller
67            config_mode: A pre-defined configuration mode to use.
68        """
69        super().__init__()
70        try:
71            self.cmx = cmx500.Cmx500(ip_address, port)
72        except:
73            raise cc.CellularSimulatorError('Error when Initializes CMX500.')
74
75        self._config_mode = config_mode
76        self.bts = self.cmx.bts
77
78    def destroy(self):
79        """ Sends finalization commands to the cellular equipment and closes
80        the connection. """
81        self.log.info('destroy the cmx500 simulator')
82        self.cmx.disconnect()
83
84    def set_config_mode(self, config_mode=None):
85        """Sets config mode for the cmx 500 simulator."""
86        self._config_mode = config_mode
87
88    def configure_lte_bts(self, config, bts_index):
89        """ Commands the equipment to setup an LTE base station with the
90        required configuration except drx.
91
92        Args:
93            config: an LteSimulation.BtsConfig object.
94            bts_index: the base station number.
95        """
96        self.configure_lte_bts_base(config, bts_index)
97
98    def configure_lte_bts_after_started(self, config, bts_index):
99
100        if config.drx_connected_mode:
101            self.set_cdrx_config(bts_index, config)
102        if config.disable_all_ul_subframes:
103            self.bts[bts_index].disable_all_ul_subframes()
104        self.log.info(
105            'The radio connectivity after lte started config is {}'.format(
106                self.cmx.dut.state.radio_connectivity))
107
108    def configure_nr_bts_after_started(self, config, bts_index):
109        if config.drx_connected_mode:
110            self.set_cdrx_config(bts_index, config)
111        if config.config_flexible_slots:
112            self.bts[bts_index].config_flexible_slots()
113        if config.disable_all_ul_slots:
114            self.bts[bts_index].disable_all_ul_slots()
115        self.log.info(
116            'The radio connectivity after nr started config is {}'.format(
117                self.cmx.dut.state.radio_connectivity))
118
119    def setup_lte_scenario(self):
120        """ Configures the equipment for an LTE simulation. """
121        self.log.info('setup lte scenario')
122        self.cmx.switch_lte_signalling(cmx500.LteState.LTE_ON)
123
124    def setup_nr_sa_scenario(self):
125        """ Configures the equipment for an NR stand alone simulation. """
126        raise NotImplementedError()
127
128    def setup_nr_nsa_scenario(self):
129        """ Configures the equipment for an NR non stand alone simulation. """
130        self.log.info('setup nsa scenario (start lte cell and nr cell')
131        self.cmx.switch_on_nsa_signalling()
132
133    def set_band_combination(self, bands, mimo_modes):
134        """ Prepares the test equipment for the indicated band/mimo combination.
135
136        Args:
137            bands: a list of bands represented as ints or strings
138            mimo_modes: a list of LteSimulation.MimoMode to use for each carrier
139        """
140        self.num_carriers = len(bands)
141
142        # Don't configure secondary cells if we're using the built-in power
143        # configuration.
144        if self._config_mode != ConfigurationMode.Power:
145            self.cmx.set_band_combination(bands)
146
147    def set_lte_rrc_state_change_timer(self, enabled, time=10):
148        """ Configures the LTE RRC state change timer.
149
150        Args:
151            enabled: a boolean indicating if the timer should be on or off.
152            time: time in seconds for the timer to expire
153        """
154        self.log.info('set timer enabled to {} and the time to {}'.format(
155            enabled, time))
156        self.cmx.rrc_state_change_time_enable = enabled
157        self.cmx.lte_rrc_state_change_timer = time
158
159    def set_band(self, bts_index, band):
160        """ Sets the band for the indicated base station.
161
162        Args:
163            bts_index: the base station number
164            band: the new band
165        """
166        self.log.info('set band to {}'.format(band))
167        self.bts[bts_index].set_band(int(band))
168
169    def set_cdrx_config(self, bts_index, config):
170        """ Sets the tdd configuration number for the indicated base station.
171
172        Args:
173            bts_index: the base station number
174            config: the config including cdrx parameters
175        """
176        self.log.info('set cdrx config for bts {} to {}'.format(
177            bts_index, config)
178        )
179        self.bts[bts_index].set_cdrx_config(config)
180
181    def get_duplex_mode(self, band):
182        """ Determines if the band uses FDD or TDD duplex mode
183
184        Args:
185            band: a band number
186
187        Returns:
188            an variable of class DuplexMode indicating if band is FDD or TDD
189        """
190        if 33 <= int(band) <= 46:
191            return cmx500.DuplexMode.TDD
192        else:
193            return cmx500.DuplexMode.FDD
194
195    def set_input_power(self, bts_index, input_power):
196        """ Sets the input power for the indicated base station.
197
198        Args:
199            bts_index: the base station number
200            input_power: the new input power
201        """
202        if input_power > 23:
203            self.log.warning('Open loop supports -50dBm to 23 dBm. '
204                             'Setting it to max power 23 dBm')
205            input_power = 23
206        self.log.info('set input power to {}'.format(input_power))
207        self.bts[bts_index].set_ul_power(input_power)
208
209    def set_output_power(self, bts_index, output_power):
210        """ Sets the output power for the indicated base station.
211
212        Args:
213            bts_index: the base station number
214            output_power: the new output power
215        """
216        self.log.info('set output power to {}'.format(output_power))
217        self.bts[bts_index].set_dl_power(output_power)
218
219    def set_tdd_config(self, bts_index, tdd_config):
220        """ Sets the tdd configuration number for the indicated base station.
221
222        Args:
223            bts_index: the base station number
224            tdd_config: the new tdd configuration number (from 0 to 6)
225        """
226        self.log.info('set tdd config to {}'.format(tdd_config))
227        self.bts[bts_index].set_tdd_config(tdd_config)
228
229    def set_ssf_config(self, bts_index, ssf_config):
230        """ Sets the Special Sub-Frame config number for the indicated
231        base station.
232
233        Args:
234            bts_index: the base station number
235            ssf_config: the new ssf config number (from 0 to 9)
236        """
237        self.log.info('set ssf config to {}'.format(ssf_config))
238        self.bts[bts_index].set_ssf_config(ssf_config)
239
240    def set_bandwidth(self, bts_index, bandwidth):
241        """ Sets the bandwidth for the indicated base station.
242
243        Args:
244            bts_index: the base station number
245            bandwidth: the new bandwidth in MHz
246        """
247        self.log.info('set bandwidth of bts {} to {}'.format(
248            bts_index, bandwidth))
249        self.bts[bts_index].set_bandwidth(int(bandwidth))
250
251    def set_downlink_channel_number(self, bts_index, channel_number):
252        """ Sets the downlink channel number for the indicated base station.
253
254        Args:
255            bts_index: the base station number
256            channel_number: the new channel number (earfcn)
257        """
258        self.log.info(
259            'Sets the downlink channel number to {}'.format(channel_number))
260        self.bts[bts_index].set_dl_channel(channel_number)
261
262    def set_mimo_mode(self, bts_index, mimo_mode):
263        """ Sets the mimo mode for the indicated base station.
264
265        Args:
266            bts_index: the base station number
267            mimo_mode: the new mimo mode
268        """
269        self.log.info('set mimo mode to {}'.format(mimo_mode))
270        mimo_mode = CMX_MIMO_MAPPING[mimo_mode]
271        self.bts[bts_index].set_mimo_mode(mimo_mode)
272
273    def set_transmission_mode(self, bts_index, tmode):
274        """ Sets the transmission mode for the indicated base station.
275
276        Args:
277            bts_index: the base station number
278            tmode: the new transmission mode
279        """
280        self.log.info('set TransmissionMode to {}'.format(tmode))
281        tmode = CMX_TM_MAPPING[tmode]
282        self.bts[bts_index].set_transmission_mode(tmode)
283
284    def set_scheduling_mode(self,
285                            bts_index,
286                            scheduling,
287                            mcs_dl=None,
288                            mcs_ul=None,
289                            nrb_dl=None,
290                            nrb_ul=None):
291        """ Sets the scheduling mode for the indicated base station.
292
293        Args:
294            bts_index: the base station number.
295            scheduling: the new scheduling mode.
296            mcs_dl: Downlink MCS.
297            mcs_ul: Uplink MCS.
298            nrb_dl: Number of RBs for downlink.
299            nrb_ul: Number of RBs for uplink.
300        """
301        if scheduling not in CMX_SCH_MAPPING:
302            raise cc.CellularSimulatorError(
303                "This scheduling mode is not supported")
304        log_list = []
305        if mcs_dl:
306            log_list.append('mcs_dl: {}'.format(mcs_dl))
307        if mcs_ul:
308            log_list.append('mcs_ul: {}'.format(mcs_ul))
309        if nrb_dl:
310            log_list.append('nrb_dl: {}'.format(nrb_dl))
311        if nrb_ul:
312            log_list.append('nrb_ul: {}'.format(nrb_ul))
313
314        self.log.info('set scheduling mode to {}'.format(','.join(log_list)))
315        self.bts[bts_index].set_scheduling_mode(mcs_dl=mcs_dl,
316                                                mcs_ul=mcs_ul,
317                                                nrb_dl=nrb_dl,
318                                                nrb_ul=nrb_ul)
319
320    def set_dl_256_qam_enabled(self, bts_index, enabled):
321        """ Determines what MCS table should be used for the downlink.
322
323        Args:
324            bts_index: the base station number
325            enabled: whether 256 QAM should be used
326        """
327        self.log.info('Set 256 QAM DL MCS enabled: ' + str(enabled))
328        self.bts[bts_index].set_dl_modulation_table(
329            cmx500.ModulationType.Q256 if enabled else cmx500.ModulationType.
330            Q64)
331
332    def set_ul_64_qam_enabled(self, bts_index, enabled):
333        """ Determines what MCS table should be used for the uplink.
334
335        Args:
336            bts_index: the base station number
337            enabled: whether 64 QAM should be used
338        """
339        self.log.info('Set 64 QAM UL MCS enabled: ' + str(enabled))
340        self.bts[bts_index].set_ul_modulation_table(
341            cmx500.ModulationType.Q64 if enabled else cmx500.ModulationType.Q16
342        )
343
344    def set_mac_padding(self, bts_index, mac_padding):
345        """ Enables or disables MAC padding in the indicated base station.
346
347        Args:
348            bts_index: the base station number
349            mac_padding: the new MAC padding setting
350        """
351        self.log.info('set mac pad on {}'.format(mac_padding))
352        self.bts[bts_index].set_dl_mac_padding(mac_padding)
353
354    def set_cfi(self, bts_index, cfi):
355        """ Sets the Channel Format Indicator for the indicated base station.
356
357        Args:
358            bts_index: the base station number
359            cfi: the new CFI setting
360        """
361        if cfi == 'BESTEFFORT':
362            self.log.info('The cfi is BESTEFFORT, use default value')
363            return
364        try:
365            index = int(cfi) + 1
366        except Exception as e:
367            index = 1
368        finally:
369            self.log.info('set the cfi and the cfi index is {}'.format(index))
370            self.bts[bts_index].set_cfi(index)
371
372    def set_paging_cycle(self, bts_index, cycle_duration):
373        """ Sets the paging cycle duration for the indicated base station.
374
375        Args:
376            bts_index: the base station number
377            cycle_duration: the new paging cycle duration in milliseconds
378        """
379        self.log.warning('The set_paging_cycle method is not implememted, '
380                         'use default value')
381
382    def set_phich_resource(self, bts_index, phich):
383        """ Sets the PHICH Resource setting for the indicated base station.
384
385        Args:
386            bts_index: the base station number
387            phich: the new PHICH resource setting
388        """
389        self.log.warning('The set_phich_resource method is not implememted, '
390                         'use default value')
391
392    def set_tracking_area(self, bts_index, tac):
393        """ Assigns the cell to a specific tracking area.
394
395        Args:
396            tac: the unique tac to assign the cell to.
397        """
398        self.bts[bts_index].set_tracking_area(tac)
399
400    def lte_attach_secondary_carriers(self, ue_capability_enquiry):
401        """ Activates the secondary carriers for CA. Requires the DUT to be
402        attached to the primary carrier first.
403
404        Args:
405            ue_capability_enquiry: UE capability enquiry message to be sent to
406        the UE before starting carrier aggregation.
407        """
408        self.wait_until_communication_state()
409
410        # primary cell is attached, now turn on all secondary cells
411        if self.cmx.secondary_cells:
412            self.cmx.turn_on_secondary_cells()
413
414        # if a primary lte and primary nr cell exist, then activate endc on
415        # primary nr cell
416        is_nsa = self.cmx.primary_lte_cell and self.cmx.primary_nr_cell
417
418        if is_nsa:
419            self.cmx.primary_nr_cell.activate_endc()
420
421        # attach secondary lte and nr cells
422        # if nsa then nr cells should be added to secondary cell group
423        for bts in self.cmx.secondary_lte_cells:
424            bts.attach_as_secondary_cell()
425
426        for bts in self.cmx.secondary_nr_cells:
427            bts.attach_as_secondary_cell(scg=is_nsa)
428
429        if self._config_mode and self._config_mode == ConfigurationMode.Power:
430            self.configure_for_power_measurement()
431
432        self.log.info('The radio connectivity is {}'.format(
433            self.cmx.dut.state.radio_connectivity))
434
435    def configure_for_power_measurement(self):
436        """ Applies a pre-defined configuration for PDCCH power testing."""
437        self.log.info('set lte cdrx for nr nsa scenario')
438        for bts in self.cmx.lte_cells:
439            bts.set_default_cdrx_config()
440        time.sleep(5)
441
442        self.log.info('Disables mac padding')
443        for bts in self.bts:
444            bts.set_dl_mac_padding(False)
445        time.sleep(5)
446
447        self.log.info('configure flexible slots and wait for 5 seconds')
448        for bts in self.cmx.nr_cells:
449            bts.config_flexible_slots()
450        time.sleep(5)
451
452        self.log.info('disable all ul subframes of the lte cell')
453        for bts in self.cmx.lte_cells:
454            bts.disable_all_ul_subframes()
455        time.sleep(30)
456
457        self.log.info('Disables Nr UL slots')
458        for bts in self.cmx.nr_cells:
459            bts.disable_all_ul_slots()
460        time.sleep(5)
461
462    def wait_until_attached(self, timeout=120):
463        """ Waits until the DUT is attached to the primary carrier.
464
465        Args:
466            timeout: after this amount of time the method will raise a
467                CellularSimulatorError exception. Default is 120 seconds.
468        """
469        self.log.info('wait until attached')
470        if len(self.cmx.tracking_areas) > 1:
471            self.log.info('turning off neighbor cells')
472            self.cmx.turn_on_primary_cells()
473            self.cmx.turn_off_neighbor_cells()
474
475        self.cmx.wait_until_attached(timeout)
476
477    def wait_until_communication_state(self, timeout=120):
478        """ Waits until the DUT is in Communication state.
479
480        Args:
481            timeout: after this amount of time the method will raise a
482                CellularSimulatorError exception. Default is 120 seconds.
483        Return:
484            True if cmx reach rrc state within timeout
485        Raise:
486            CmxError if tiemout
487        """
488        self.log.info('wait for rrc on state')
489        return self.cmx.wait_for_rrc_state(cmx500.RrcState.RRC_ON, timeout)
490
491    def wait_until_idle_state(self, timeout=120):
492        """ Waits until the DUT is in Idle state.
493
494        Args:
495            timeout: after this amount of time the method will raise a
496                CellularSimulatorError exception. Default is 120 seconds.
497        Return:
498            True if cmx reach rrc state within timeout
499        Raise:
500            CmxError if tiemout
501        """
502        self.log.info('wait for rrc off state')
503        return self.cmx.wait_for_rrc_state(cmx500.RrcState.RRC_OFF, timeout)
504
505    def wait_until_quiet(self, timeout=120):
506        """Waits for all pending operations to finish on the simulator.
507
508        Args:
509            timeout: after this amount of time the method will raise a
510                CellularSimulatorError exception. Default is 120 seconds.
511        """
512        self.cmx.network_apply_changes()
513
514    def detach(self):
515        """ Turns off all the base stations so the DUT loose connection."""
516        self.log.info('Bypass simulator detach step for now')
517
518    def stop(self):
519        """ Stops current simulation. After calling this method, the simulator
520        will need to be set up again. """
521        self.log.info('Stops current simulation and disconnect cmx500')
522        self.cmx.disconnect()
523
524    def start_data_traffic(self):
525        """ Starts transmitting data from the instrument to the DUT. """
526        self.log.warning('The start_data_traffic is not implemented yet')
527
528    def stop_data_traffic(self):
529        """ Stops transmitting data from the instrument to the DUT. """
530        self.log.warning('The stop_data_traffic is not implemented yet')
531
532    def send_sms(self, message):
533        """ Sends an SMS message to the DUT.
534
535        Args:
536            message: the SMS message to send.
537        """
538        self.cmx.send_sms(message)
539