1#!/usr/bin/env python3
2#
3#   Copyright 2021 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#           http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18import time
19import sys
20
21from enum import Enum
22from os import path
23from acts.controllers import abstract_inst
24
25DEFAULT_XLAPI_PATH = '/home/mobileharness/Rohde-Schwarz/XLAPI/latest/venv/lib/python3.7/site-packages'
26DEFAULT_LTE_STATE_CHANGE_TIMER = 10
27DEFAULT_CELL_SWITCH_ON_TIMER = 60
28DEFAULT_ENDC_TIMER = 300
29
30logger = logging.getLogger('Xlapi_cmx500')
31
32LTE_CELL_PROPERTIES = [
33    'band',
34    'bandwidth',
35    'dl_earfcn',
36    'ul_earfcn',
37    'total_dl_power',
38    'p_b',
39    'dl_epre',
40    'ref_signal_power',
41    'm',
42    'beamforming_antenna_ports',
43    'p0_nominal_pusch',
44]
45
46LTE_MHZ_UPPER_BOUND_TO_RB = [
47    (1.5, 6),
48    (4.0, 15),
49    (7.5, 25),
50    (12.5, 50),
51    (17.5, 75),
52]
53
54
55class DciFormat(Enum):
56    """Support DCI Formats for MIMOs."""
57    DCI_FORMAT_0 = 1
58    DCI_FORMAT_1 = 2
59    DCI_FORMAT_1A = 3
60    DCI_FORMAT_1B = 4
61    DCI_FORMAT_1C = 5
62    DCI_FORMAT_2 = 6
63    DCI_FORMAT_2A = 7
64    DCI_FORMAT_2B = 8
65    DCI_FORMAT_2C = 9
66    DCI_FORMAT_2D = 10
67
68
69class DuplexMode(Enum):
70    """Duplex Modes."""
71    FDD = 'FDD'
72    TDD = 'TDD'
73    DL_ONLY = 'DL_ONLY'
74
75
76class LteBandwidth(Enum):
77    """Supported LTE bandwidths."""
78    BANDWIDTH_1MHz = 6  # MHZ_1 is RB_6
79    BANDWIDTH_3MHz = 15  # MHZ_3 is RB_15
80    BANDWIDTH_5MHz = 25  # MHZ_5 is RB_25
81    BANDWIDTH_10MHz = 50  # MHZ_10 is RB_50
82    BANDWIDTH_15MHz = 75  # MHZ_15 is RB_75
83    BANDWIDTH_20MHz = 100  # MHZ_20 is RB_100
84
85
86class LteState(Enum):
87    """LTE ON and OFF."""
88    LTE_ON = 'ON'
89    LTE_OFF = 'OFF'
90
91
92class MimoModes(Enum):
93    """MIMO Modes dl antennas."""
94    MIMO1x1 = 1
95    MIMO2x2 = 2
96    MIMO4x4 = 4
97
98
99class ModulationType(Enum):
100    """Supported Modulation Types."""
101    Q16 = 0
102    Q64 = 1
103    Q256 = 2
104
105
106class NasState(Enum):
107    """NAS state between callbox and dut."""
108    DEREGISTERED = 'OFF'
109    EMM_REGISTERED = 'EMM'
110    MM5G_REGISTERED = 'NR'
111
112
113class RrcState(Enum):
114    """States to enable/disable rrc."""
115    RRC_ON = 'ON'
116    RRC_OFF = 'OFF'
117
118
119class RrcConnectionState(Enum):
120    """RRC Connection states, describes possible DUT RRC connection states."""
121    IDLE = 1
122    IDLE_PAGING = 2
123    IDLE_CONNECTION_ESTABLISHMENT = 3
124    CONNECTED = 4
125    CONNECTED_CONNECTION_REESTABLISHMENT = 5
126    CONNECTED_SCG_FAILURE = 6
127    CONNECTED_HANDOVER = 7
128    CONNECTED_CONNECTION_RELEASE = 8
129
130
131class SchedulingMode(Enum):
132    """Supported scheduling modes."""
133    USERDEFINEDCH = 'UDCHannels'
134
135
136class TransmissionModes(Enum):
137    """Supported transmission modes."""
138    TM1 = 1
139    TM2 = 2
140    TM3 = 3
141    TM4 = 4
142    TM7 = 7
143    TM8 = 8
144    TM9 = 9
145
146
147# For mimo 1x1, also set_num_crs_antenna_ports to 1
148MIMO_MAX_LAYER_MAPPING = {
149    MimoModes.MIMO1x1: 2,
150    MimoModes.MIMO2x2: 2,
151    MimoModes.MIMO4x4: 4,
152}
153
154
155class Cmx500(abstract_inst.SocketInstrument):
156    def __init__(self, ip_addr, port, xlapi_path=DEFAULT_XLAPI_PATH):
157        """Init method to setup variables for the controller.
158
159        Args:
160              ip_addr: Controller's ip address.
161              port: Port.
162        """
163
164        # keeps the socket connection for debug purpose for now
165        super().__init__(ip_addr, port)
166        if not xlapi_path in sys.path:
167            sys.path.insert(0, xlapi_path)
168        self._initial_xlapi()
169        self._settings.system.set_instrument_address(ip_addr)
170        logger.info('The instrument address is {}'.format(
171            self._settings.system.get_instrument_address()))
172
173        self.bts = []
174
175        # Stops all active cells if there is any
176        self.clear_network()
177
178        # initialize one lte and nr cell
179        self._add_lte_cell()
180        self._add_nr_cell()
181
182        self.lte_rrc_state_change_timer = DEFAULT_LTE_STATE_CHANGE_TIMER
183        self.rrc_state_change_time_enable = False
184        self.cell_switch_on_timer = DEFAULT_CELL_SWITCH_ON_TIMER
185
186    def set_band_combination(self, bands):
187        """ Prepares the test equipment for the indicated band/mimo combination.
188
189        Args:
190            bands: a list of bands represented as ints or strings
191            mimo_modes: a list of LteSimulation.MimoMode to use for each carrier
192        """
193        self.clear_network()
194
195        n_lte_cells = 1
196        n_nr_cells = 1
197        for band in bands:
198            if isinstance(band, str) and band[0] == 'n':
199                nr_bts = self._add_nr_cell()
200                nr_bts.set_band(int(band[1:]))
201                # set secondary cells to dl only to avoid running out of
202                # resources in high CA scenarios
203                if n_nr_cells > 2:
204                    nr_bts.set_dl_only(True)
205                n_nr_cells += 1
206            else:
207                lte_bts = self._add_lte_cell()
208                lte_bts.set_band(int(band))
209                if n_lte_cells > 2:
210                    lte_bts.set_dl_only(True)
211                n_lte_cells += 1
212
213        # turn on primary lte and/or nr cells
214        self._network.apply_changes()
215        self.turn_on_primary_cells()
216
217    def _add_nr_cell(self):
218        """Creates a new NR cell and configures antenna ports."""
219        from mrtype.counters import N310
220        nr_cell = self._network.create_nr_cell()
221        nr_cell_max_config = nr_cell.stub.GetMaximumConfiguration()
222        nr_cell_max_config.csi_rs_antenna_ports = self.MAX_CSI_RS_PORTS
223        nr_cell.stub.SetMaximumConfiguration(nr_cell_max_config)
224        # Sets n310 timer to N310.N20 to make endc more stable
225        nr_cell.set_n310(N310.N20)
226
227        nr_bts = self.create_base_station(nr_cell)
228        self.bts.append(nr_bts)
229        return nr_bts
230
231    def _add_lte_cell(self):
232        """Creates a new LTE cell and configures antenna ports."""
233        lte_cell = self._network.create_lte_cell()
234        lte_cell_max_config = lte_cell.stub.GetMaximumConfiguration()
235        lte_cell_max_config.csi_rs_antenna_ports = self.MAX_CSI_RS_PORTS
236        lte_cell_max_config.crs_antenna_ports = self.MAX_CRS_PORTS
237        lte_cell.stub.SetMaximumConfiguration(lte_cell_max_config)
238        lte_bts = self.create_base_station(lte_cell)
239
240        self.bts.append(lte_bts)
241        return lte_bts
242
243    def _initial_xlapi(self):
244        import xlapi
245        import mrtype
246        from xlapi import network
247        from xlapi import settings
248        from rs_mrt.testenvironment.signaling.sri.rat.common import (
249            CsiRsAntennaPorts)
250        from rs_mrt.testenvironment.signaling.sri.rat.lte import CrsAntennaPorts
251
252        self._xlapi = xlapi
253        self._network = network
254        self._settings = settings
255
256        # initialize defaults
257        self.MAX_CSI_RS_PORTS = (
258            CsiRsAntennaPorts.NUMBER_CSI_RS_ANTENNA_PORTS_FOUR)
259        self.MAX_CRS_PORTS = CrsAntennaPorts.NUMBER_CRS_ANTENNA_PORTS_FOUR
260
261    def configure_mimo_settings(self, mimo, bts_index=0):
262        """Sets the mimo scenario for the test.
263
264        Args:
265            mimo: mimo scenario to set.
266        """
267        self.bts[bts_index].set_mimo_mode(mimo)
268
269    @property
270    def connection_type(self):
271        """Gets the connection type applied in callbox."""
272        state = self.dut.state.rrc_connection_state
273        return RrcConnectionState(state.value)
274
275    def create_base_station(self, cell):
276        """Creates the base station object with cell and current object.
277
278        Args:
279            cell: the XLAPI cell.
280
281        Returns:
282            base station object.
283        Raise:
284            CmxError if the cell is neither LTE nor NR.
285        """
286        from xlapi.lte_cell import LteCell
287        from xlapi.nr_cell import NrCell
288        if isinstance(cell, LteCell):
289            return LteBaseStation(self, cell)
290        elif isinstance(cell, NrCell):
291            return NrBaseStation(self, cell)
292        else:
293            raise CmxError('The cell type is neither LTE nor NR')
294
295    def detach(self):
296        """Detach callbox and controller."""
297        for bts in self.bts:
298            bts.stop()
299
300    def disable_packet_switching(self):
301        """Disable packet switching in call box."""
302        raise NotImplementedError()
303
304    def clear_network(self):
305        """Wipes the network configuration."""
306        self.bts.clear()
307        self._network.reset()
308        # dut stub becomes stale after resetting
309        self.dut = self._network.get_dut()
310
311    def disconnect(self):
312        """Disconnect controller from device and switch to local mode."""
313        self.clear_network()
314        self._close_socket()
315
316    def enable_packet_switching(self):
317        """Enable packet switching in call box."""
318        raise NotImplementedError()
319
320    def get_cell_configs(self, cell):
321        """Getes cell settings.
322
323        This method is for debugging purpose. In XLAPI, there are many get
324        methods in the cell or component carrier object. When this method is
325        called, the corresponding value could be recorded with logging, which is
326        useful for debug.
327
328        Args:
329            cell: the lte or nr cell in xlapi
330        """
331        cell_getters = [attr for attr in dir(cell) if attr.startswith('get')]
332        for attr in cell_getters:
333            try:
334                getter = getattr(cell, attr)
335                logger.info('The {} is {}'.format(attr, getter()))
336            except Exception as e:
337                logger.warning('Error in get {}: {}'.format(attr, e))
338
339    def get_base_station(self, bts_index=0):
340        """Gets the base station object based on bts num. By default
341        bts_index set to 0 (PCC).
342
343        Args:
344            bts_num: base station identifier
345
346        Returns:
347            base station object.
348        """
349        return self.bts[bts_index]
350
351    def get_network(self):
352        """ Gets the network object from cmx500 object."""
353        return self._network
354
355    def init_lte_measurement(self):
356        """Gets the class object for lte measurement which can be used to
357        initiate measurements.
358
359        Returns:
360            lte measurement object.
361        """
362        raise NotImplementedError()
363
364    def network_apply_changes(self):
365        """Uses self._network to apply changes"""
366        self._network.apply_changes()
367
368    def reset(self):
369        """System level reset."""
370
371        self.disconnect()
372
373    @property
374    def rrc_connection(self):
375        """Gets the RRC connection state."""
376        return self.dut.state.rrc.is_connected
377
378    def set_timer(self, timeout):
379        """Sets timer for the Cmx500 class."""
380        self.rrc_state_change_time_enable = True
381        self.lte_rrc_state_change_timer = timeout
382
383    def switch_lte_signalling(self, state):
384        """ Turns LTE signalling ON/OFF.
385
386        Args:
387            state: an instance of LteState indicating the state to which LTE
388                   signal has to be set.
389        """
390        if not isinstance(state, LteState):
391            raise ValueError('state should be the instance of LteState.')
392
393        if self.primary_lte_cell:
394            self.set_bts_enabled(state.value == 'ON', self.primary_lte_cell)
395        else:
396            raise CmxError(
397                'Unable to set LTE signalling to {},'.format(state.value) +
398                ' no LTE cell found.')
399
400    def switch_on_nsa_signalling(self):
401        logger.info('Switches on NSA signalling')
402
403        if self.primary_lte_cell:
404            self.set_bts_enabled(True, self.primary_lte_cell)
405        else:
406            raise CmxError(
407                'Unable to turn on NSA signalling, no LTE cell found.')
408
409        if self.primary_nr_cell:
410            self.set_bts_enabled(True, self.primary_nr_cell)
411        else:
412            raise CmxError(
413                'Unable to turn on NSA signalling, no NR cell found.')
414
415        time.sleep(5)
416
417    @property
418    def primary_cell(self):
419        """Gets the primary cell in the current scenario."""
420        # If the simulation has an active cell return it as the primary
421        # otherwise default to the first cell.
422        for cell in self.bts:
423            if cell.is_primary_cell:
424                return cell
425
426        return self.bts[0] if self.bts else None
427
428    @property
429    def primary_lte_cell(self):
430        """Gets the primary LTE cell in the current scenario.
431
432        Note: This should generally be the same as primary_cell unless we're
433        connected to a NR cell in a TA with both LTE and NR cells.
434        """
435        cell = self.primary_cell
436        if isinstance(cell, LteBaseStation):
437            return cell
438
439        # find the first cell in the same ta as the primary cell
440        elif cell:
441            cells = self._get_cells_in_same_ta(cell)
442            return next((c for c in cells if isinstance(c, LteBaseStation)),
443                        None)
444
445        return None
446
447    @property
448    def primary_nr_cell(self):
449        """Gets the primary NR cell in the current scenario.
450
451        Note: This may be the PSCell for NSA scenarios.
452        """
453        cell = self.primary_cell
454        if cell and isinstance(cell, NrBaseStation):
455            return cell
456
457        elif cell:
458            cells = self._get_cells_in_same_ta(cell)
459            return next((c for c in cells if isinstance(c, NrBaseStation)),
460                        None)
461
462        return None
463
464    @property
465    def lte_cells(self):
466        """Gets all LTE cells in the current scenario."""
467        return list(
468            [bts for bts in self.bts if isinstance(bts, LteBaseStation)])
469
470    @property
471    def nr_cells(self):
472        """Gets all NR cells in the current scenario."""
473        return list(
474            [bts for bts in self.bts if isinstance(bts, NrBaseStation)])
475
476    @property
477    def secondary_lte_cells(self):
478        """Gets a list of all LTE cells in the same TA as the primary cell."""
479        pcell = self.primary_cell
480        if not pcell or not self.lte_cells:
481            return []
482
483        # If NR cell is primary then there are no secondary LTE cells.
484        if not isinstance(pcell, LteBaseStation):
485            return []
486
487        return [
488            c for c in self._get_cells_in_same_ta(pcell)
489            if isinstance(c, LteBaseStation)
490        ]
491
492    @property
493    def secondary_nr_cells(self):
494        """Gets a list of all NR cells in the same TA as the primary cell."""
495        pcell = self.primary_nr_cell
496        if not pcell or not self.nr_cells:
497            return []
498
499        return [
500            c for c in self._get_cells_in_same_ta(pcell)
501            if isinstance(c, NrBaseStation)
502        ]
503
504    @property
505    def secondary_cells(self):
506        """Gets all secondary cells in the current scenario."""
507        return self.secondary_lte_cells + self.secondary_nr_cells
508
509    @property
510    def tracking_areas(self):
511        """Returns a list of LTE and 5G tracking areas in the simulation."""
512        plmn = self._network.get_or_create_plmn()
513        return plmn.eps_ta_list + plmn.fivegs_ta_list
514
515    def _get_cells_in_same_ta(self, bts):
516        """Returns a list of all cells in the same TA."""
517        tracking_area = next(t for t in self.tracking_areas
518                             if bts._cell in t.cells)
519        return [
520            c for c in self.bts if c != bts and c._cell in tracking_area.cells
521        ]
522
523    def set_bts_enabled(self, enabled, bts):
524        """Switch bts signalling on/off.
525
526        Args:
527            enabled: True/False if the  signalling should be enabled.
528            bts: The bts to configure.
529        """
530        if enabled and not bts.is_on():
531            bts.start()
532            state = bts.wait_cell_enabled(self.cell_switch_on_timer, True)
533            if state:
534                logger.info('The cell status is on')
535            else:
536                raise CmxError('The cell cannot be switched on')
537
538        elif not enabled and bts.is_on():
539            bts.stop()
540            state = bts.wait_cell_enabled(self.cell_switch_on_timer, False)
541            if not state:
542                logger.info('The cell status is off')
543            else:
544                raise CmxError('The cell cannot be switched off')
545
546    @property
547    def use_carrier_specific(self):
548        """Gets current status of carrier specific duplex configuration."""
549        raise NotImplementedError()
550
551    @use_carrier_specific.setter
552    def use_carrier_specific(self, state):
553        """Sets the carrier specific duplex configuration.
554
555        Args:
556            state: ON/OFF UCS configuration.
557        """
558        raise NotImplementedError()
559
560    def set_keep_rrc(self, keep_connected):
561        """Sets if the CMX should maintain RRC connection after registration.
562
563        Args:
564            keep_connected: true if cmx should stay in RRC_CONNECTED state or
565                false if UE preference should be used.
566        """
567        state = 'ON' if keep_connected else 'OFF'
568        self._send('CONFigure:SIGNaling:EPS:NBEHavior:KRRC {}'.format(state))
569        self._send('CONFigure:SIGNaling:FGS:NBEHavior:KRRC {}'.format(state))
570
571    def turn_off_neighbor_cells(self):
572        """Turns off all cells in other tracking areas."""
573        for cell in self.bts:
574            if not cell.is_active:
575                self.set_bts_enabled(False, cell)
576
577    def turn_on_primary_cells(self):
578        """Turns on all primary cells."""
579        if self.primary_lte_cell:
580            self.set_bts_enabled(True, self.primary_lte_cell)
581        if self.primary_nr_cell:
582            self.set_bts_enabled(True, self.primary_nr_cell)
583
584    def turn_on_secondary_cells(self):
585        """Turns on all secondary cells."""
586        for bts in self.secondary_cells:
587            self.set_bts_enabled(True, bts)
588
589    def handover(self, primary, secondary=None):
590        """Performs a Inter/Intra-RAT handover.
591
592        Args:
593            primary: the new primary bts.
594            secondary: the new secondary bts.
595        """
596        self.dut.signaling.handover_to(primary.cell,
597                                       secondary.cell if secondary else None,
598                                       None)
599
600    def wait_for_rrc_state(self, state, timeout=120):
601        """ Waits until a certain RRC state is set.
602
603        Args:
604            state: the RRC state that is being waited for.
605            timeout: timeout for phone to be in connected state.
606
607        Raises:
608            CmxError on time out.
609        """
610        is_idle = (state.value == 'OFF')
611        for idx in range(timeout):
612            time.sleep(1)
613            if self.dut.state.rrc.is_idle == is_idle:
614                logger.info('{} reached at {} s'.format(state.value, idx))
615                return True
616        error_message = 'Waiting for {} state timeout after {}'.format(
617            state.value, timeout)
618        logger.error(error_message)
619        raise CmxError(error_message)
620
621    def wait_until_attached(self, timeout=120):
622        """Waits until Lte attached.
623
624        Args:
625            timeout: timeout for phone to get attached.
626
627        Raises:
628            CmxError on time out.
629        """
630        if not self.bts:
631            raise CmxError('cannot attach device, no cells found')
632
633        self.primary_cell.wait_for_attach(timeout)
634
635    def send_sms(self, message):
636        """ Sends an SMS message to the DUT.
637
638        Args:
639            message: the SMS message to send.
640        """
641        self.dut.signaling.mt_sms(message)
642
643
644class BaseStation(object):
645    """Class to interact with different the base stations."""
646    def __init__(self, cmx, cell):
647        """Init method to setup variables for base station.
648
649        Args:
650            cmx: Controller (Cmx500) object.
651            cell: The cell for the base station.
652        """
653
654        self._cell = cell
655        self._cmx = cmx
656        self._cc = cmx.dut.cc(cell)
657        self._network = cmx.get_network()
658
659    @property
660    def band(self):
661        """Gets the current band of cell.
662
663        Return:
664            the band number in int.
665        """
666        cell_band = self._cell.get_band()
667        return int(cell_band)
668
669    @property
670    def dl_power(self):
671        """Gets RSPRE level.
672
673        Return:
674            the power level in dbm.
675        """
676        return self._cell.get_total_dl_power().in_dBm()
677
678    @property
679    def duplex_mode(self):
680        """Gets current duplex of cell."""
681        band = self._cell.get_band()
682        if band.is_fdd():
683            return DuplexMode.FDD
684        if band.is_tdd():
685            return DuplexMode.TDD
686        if band.is_dl_only():
687            return DuplexMode.DL_ONLY
688
689    def get_cc(self):
690        """Gets component carrier of the cell."""
691        return self._cc
692
693    def is_on(self):
694        """Verifies if the cell is turned on.
695
696            Return:
697                boolean (if the cell is on).
698        """
699        return self._cell.is_on()
700
701    def set_band(self, band):
702        """Sets the Band of cell.
703
704        Args:
705            band: band of cell.
706        """
707        self._cell.set_band(band)
708        logger.info('The band is set to {} and is {} after setting'.format(
709            band, self.band))
710
711    def set_dl_mac_padding(self, state):
712        """Enables/Disables downlink padding at the mac layer.
713
714        Args:
715            state: a boolean
716        """
717        self._cc.set_dl_mac_padding(state)
718
719    def set_dl_power(self, pwlevel):
720        """Modifies RSPRE level.
721
722        Args:
723            pwlevel: power level in dBm.
724        """
725        self._cell.set_total_dl_power(pwlevel)
726
727    def set_ul_power(self, ul_power):
728        """Sets ul power
729
730        Args:
731            ul_power: the uplink power in dbm
732        """
733        self._cc.set_target_ul_power(ul_power)
734
735    def set_dl_only(self, dl_only):
736        """Sets if the cell should be in downlink only mode.
737
738        Args:
739            dl_only: bool indicating if the cell should be downlink only.
740        """
741        self._cell.dl_only = dl_only
742
743    def start(self):
744        """Starts the cell."""
745        self._cell.start()
746
747    def stop(self):
748        """Stops the cell."""
749        self._cell.stop()
750
751    def wait_cell_enabled(self, timeout, enabled):
752        """Waits for the requested cell state.
753
754        Args:
755            timeout: the time for waiting the cell on.
756            enabled: true if the cell should be enabled.
757
758        Raises:
759            CmxError on time out.
760        """
761        waiting_time = 0
762        while waiting_time < timeout:
763            if self._cell.is_on() == enabled:
764                return enabled
765            waiting_time += 1
766            time.sleep(1)
767        return self._cell.is_on()
768
769    def set_tracking_area(self, tac):
770        """Sets the cells tracking area.
771
772        Args:
773            tac: the tracking area ID to add the cell to.
774        """
775        plmn = self._network.get_or_create_plmn()
776        old_ta = next((t for t in (plmn.eps_ta_list + plmn.fivegs_ta_list)
777                       if self._cell in t.cells))
778        new_ta = next((t for t in (plmn.eps_ta_list + plmn.fivegs_ta_list)
779                       if t.tac == tac), None)
780        if not new_ta:
781            new_ta = self.create_new_ta(tac)
782
783        if old_ta == new_ta:
784            return
785
786        new_ta.add_cell(self._cell)
787        old_ta.remove_cell(self._cell)
788
789    @property
790    def is_primary_cell(self):
791        """Returns true if the cell is the current primary cell."""
792        return self._cell == self._cmx.dut.state.pcell
793
794    @property
795    def is_active(self):
796        """Returns true if the cell is part of the current simulation.
797
798        A cell is considered "active" if it is the primary cell
799        or is in the same TA as the primary cell, i.e. not a neighbor cell.
800        """
801        return (self == self._cmx.primary_lte_cell
802                or self == self._cmx.primary_nr_cell
803                or self in self._cmx.secondary_cells)
804
805    @property
806    def cell(self):
807        """Returns the underlying XLAPI cell object."""
808        return self._cell
809
810
811class LteBaseStation(BaseStation):
812    """ LTE base station."""
813    def __init__(self, cmx, cell):
814        """Init method to setup variables for the LTE base station.
815
816        Args:
817            cmx: Controller (Cmx500) object.
818            cell: The cell for the LTE base station.
819        """
820        from xlapi.lte_cell import LteCell
821        if not isinstance(cell, LteCell):
822            raise CmxError(
823                'The cell is not a LTE cell, LTE base station  fails'
824                ' to create.')
825        super().__init__(cmx, cell)
826
827    def _config_scheduler(self,
828                          dl_mcs=None,
829                          dl_rb_alloc=None,
830                          dl_dci_ncce=None,
831                          dl_dci_format=None,
832                          dl_tm=None,
833                          dl_num_layers=None,
834                          dl_mcs_table=None,
835                          ul_mcs=None,
836                          ul_rb_alloc=None,
837                          ul_dci_ncce=None):
838
839        from rs_mrt.testenvironment.signaling.sri.rat.lte import DciFormat
840        from rs_mrt.testenvironment.signaling.sri.rat.lte import DlTransmissionMode
841        from rs_mrt.testenvironment.signaling.sri.rat.lte import MaxLayersMIMO
842        from rs_mrt.testenvironment.signaling.sri.rat.lte import McsTable
843        from rs_mrt.testenvironment.signaling.sri.rat.lte import PdcchFormat
844
845        log_list = []
846        if dl_mcs:
847            log_list.append('dl_mcs: {}'.format(dl_mcs))
848        if ul_mcs:
849            log_list.append('ul_mcs: {}'.format(ul_mcs))
850        if dl_rb_alloc:
851            if not isinstance(dl_rb_alloc, tuple):
852                dl_rb_alloc = (0, dl_rb_alloc)
853            log_list.append('dl_rb_alloc: {}'.format(dl_rb_alloc))
854        if ul_rb_alloc:
855            if not isinstance(ul_rb_alloc, tuple):
856                ul_rb_alloc = (0, ul_rb_alloc)
857            log_list.append('ul_rb_alloc: {}'.format(ul_rb_alloc))
858        if dl_dci_ncce:
859            dl_dci_ncce = PdcchFormat(dl_dci_ncce)
860            log_list.append('dl_dci_ncce: {}'.format(dl_dci_ncce))
861        if ul_dci_ncce:
862            ul_dci_ncce = PdcchFormat(ul_dci_ncce)
863            log_list.append('ul_dci_ncce: {}'.format(ul_dci_ncce))
864        if dl_dci_format:
865            dl_dci_format = DciFormat(dl_dci_format)
866            log_list.append('dl_dci_format: {}'.format(dl_dci_format))
867        if dl_tm:
868            dl_tm = DlTransmissionMode(dl_tm.value)
869            log_list.append('dl_tm: {}'.format(dl_tm))
870        if dl_num_layers:
871            dl_num_layers = MaxLayersMIMO(dl_num_layers)
872            log_list.append('dl_num_layers: {}'.format(dl_num_layers))
873        if dl_mcs_table:
874            dl_mcs_table = McsTable(dl_mcs_table)
875            log_list.append('dl_mcs_table: {}'.format(dl_mcs_table))
876
877        num_crs_antenna_ports = self._cell.get_num_crs_antenna_ports()
878
879        # Sets num of crs antenna ports to 4 for configuring
880        self._cell.set_num_crs_antenna_ports(4)
881        scheduler = self._cmx.dut.get_scheduler(self._cell)
882        logger.info('configure scheduler for {}'.format(','.join(log_list)))
883        scheduler.configure_scheduler(dl_mcs=dl_mcs,
884                                      dl_rb_alloc=dl_rb_alloc,
885                                      dl_dci_ncce=dl_dci_ncce,
886                                      dl_dci_format=dl_dci_format,
887                                      dl_tm=dl_tm,
888                                      dl_num_layers=dl_num_layers,
889                                      dl_mcs_table=dl_mcs_table,
890                                      ul_mcs=ul_mcs,
891                                      ul_rb_alloc=ul_rb_alloc,
892                                      ul_dci_ncce=ul_dci_ncce)
893        logger.info('Configure scheduler succeeds')
894
895        # Sets num of crs antenna ports back to previous value
896        self._cell.set_num_crs_antenna_ports(num_crs_antenna_ports)
897        self._network.apply_changes()
898
899    @property
900    def bandwidth(self):
901        """Get the channel bandwidth of the cell.
902
903        Return:
904            the number rb of the bandwidth.
905        """
906        return self._cell.get_bandwidth().num_rb
907
908    @property
909    def dl_channel(self):
910        """Gets the downlink channel of cell.
911
912        Return:
913            the downlink channel (earfcn) in int.
914        """
915        return int(self._cell.get_dl_earfcn())
916
917    @property
918    def dl_frequency(self):
919        """Get the downlink frequency of the cell."""
920        from mrtype.frequency import Frequency
921        return self._cell.get_dl_earfcn().to_freq().in_units(
922            Frequency.Units.GHz)
923
924    def _to_rb_bandwidth(self, bandwidth):
925        for idx in range(5):
926            if bandwidth < LTE_MHZ_UPPER_BOUND_TO_RB[idx][0]:
927                return LTE_MHZ_UPPER_BOUND_TO_RB[idx][1]
928        return 100
929
930    def disable_all_ul_subframes(self):
931        """Disables all ul subframes for LTE cell."""
932        self._cc.disable_all_ul_subframes()
933        self._network.apply_changes()
934        logger.info('lte cell disable all ul subframes completed')
935
936    def set_bandwidth(self, bandwidth):
937        """Sets the channel bandwidth of the cell.
938
939        Args:
940            bandwidth: channel bandwidth of cell in MHz.
941        """
942        self._cell.set_bandwidth(self._to_rb_bandwidth(bandwidth))
943        self._network.apply_changes()
944
945    def set_cdrx_config(self, config):
946        """Configures LTE cdrx with lte config parameters.
947
948        config: The LteCellConfig for current base station.
949        """
950
951        logger.info(
952            f'Configure Lte drx with\n'
953            f'drx_on_duration_timer: {config.drx_on_duration_timer}\n'
954            f'drx_inactivity_timer: {config.drx_inactivity_timer}\n'
955            f'drx_retransmission_timer: {config.drx_retransmission_timer}\n'
956            f'drx_long_cycle: {config.drx_long_cycle}\n'
957            f'drx_long_cycle_offset: {config.drx_long_cycle_offset}'
958        )
959
960        from mrtype.lte.drx import (
961            LteDrxConfig,
962            LteDrxInactivityTimer,
963            LteDrxOnDurationTimer,
964            LteDrxRetransmissionTimer,
965        )
966
967        from mrtype.lte.drx import LteDrxLongCycleStartOffset as longCycle
968
969        long_cycle_mapping = {
970            10: longCycle.ms10, 20: longCycle.ms20, 32: longCycle.ms32,
971            40: longCycle.ms40, 60: longCycle.ms60, 64: longCycle.ms64,
972            70: longCycle.ms70, 80: longCycle.ms80, 128: longCycle.ms128,
973            160: longCycle.ms160, 256: longCycle.ms256, 320: longCycle.ms320,
974            512: longCycle.ms512, 640: longCycle.ms640, 1280: longCycle.ms1280,
975            2048: longCycle.ms2048, 2560: longCycle.ms2560
976        }
977
978        drx_on_duration_timer = LteDrxOnDurationTimer(
979            int(config.drx_on_duration_timer)
980        )
981        drx_inactivity_timer = LteDrxInactivityTimer(
982            int(config.drx_inactivity_timer)
983        )
984        drx_retransmission_timer = LteDrxRetransmissionTimer(
985            int(config.drx_retransmission_timer)
986        )
987        drx_long_cycle = long_cycle_mapping[int(config.drx_long_cycle)]
988        drx_long_cycle_offset = drx_long_cycle(config.drx_long_cycle_offset)
989
990        lte_drx_config = LteDrxConfig(
991            on_duration_timer=drx_on_duration_timer,
992            inactivity_timer=drx_inactivity_timer,
993            retransmission_timer=drx_retransmission_timer,
994            long_cycle_start_offset=drx_long_cycle_offset,
995            short_drx=None
996        )
997
998        self._cmx.dut.lte_cell_group().set_drx_and_adjust_scheduler(
999            drx_config=lte_drx_config
1000        )
1001        self._network.apply_changes()
1002
1003    def set_default_cdrx_config(self):
1004        """Sets default LTE cdrx config for endc (for legacy code)."""
1005        from mrtype.lte.drx import (
1006            LteDrxConfig,
1007            LteDrxInactivityTimer,
1008            LteDrxLongCycleStartOffset,
1009            LteDrxOnDurationTimer,
1010            LteDrxRetransmissionTimer,
1011        )
1012
1013        logger.info('Config Lte drx config')
1014        lte_drx_config = LteDrxConfig(
1015            on_duration_timer=LteDrxOnDurationTimer.PSF_10,
1016            inactivity_timer=LteDrxInactivityTimer.PSF_200,
1017            retransmission_timer=LteDrxRetransmissionTimer.PSF_33,
1018            long_cycle_start_offset=LteDrxLongCycleStartOffset.ms160(83),
1019            short_drx=None)
1020        self._cmx.dut.lte_cell_group().set_drx_and_adjust_scheduler(
1021            drx_config=lte_drx_config)
1022        self._network.apply_changes()
1023
1024    def set_cell_frequency_band(self, tdd_cfg=None, ssf_cfg=None):
1025        """Sets cell frequency band with tdd and ssf config.
1026
1027        Args:
1028            tdd_cfg: the tdd subframe assignment config in number (from 0-6).
1029            ssf_cfg: the special subframe pattern config in number (from 1-9).
1030        """
1031        from rs_mrt.testenvironment.signaling.sri.rat.lte import SpecialSubframePattern
1032        from rs_mrt.testenvironment.signaling.sri.rat.lte import SubFrameAssignment
1033        from rs_mrt.testenvironment.signaling.sri.rat.lte.config import CellFrequencyBand
1034        from rs_mrt.testenvironment.signaling.sri.rat.lte.config import Tdd
1035        tdd_subframe = None
1036        ssf_pattern = None
1037        if tdd_cfg:
1038            tdd_subframe = SubFrameAssignment(tdd_cfg + 1)
1039        if ssf_cfg:
1040            ssf_pattern = SpecialSubframePattern(ssf_cfg)
1041        tdd = Tdd(tdd_config=Tdd.TddConfigSignaling(
1042            subframe_assignment=tdd_subframe,
1043            special_subframe_pattern=ssf_pattern))
1044        self._cell.stub.SetCellFrequencyBand(CellFrequencyBand(tdd=tdd))
1045        self._network.apply_changes()
1046
1047    def set_cfi(self, cfi):
1048        """Sets number of pdcch symbols (cfi).
1049
1050        Args:
1051            cfi: the value of NumberOfPdcchSymbols
1052        """
1053        from rs_mrt.testenvironment.signaling.sri.rat.lte import NumberOfPdcchSymbols
1054        from rs_mrt.testenvironment.signaling.sri.rat.lte.config import PdcchRegionReq
1055
1056        logger.info('The cfi enum to set is {}'.format(
1057            NumberOfPdcchSymbols(cfi)))
1058        req = PdcchRegionReq()
1059        req.num_pdcch_symbols = NumberOfPdcchSymbols(cfi)
1060        self._cell.stub.SetPdcchControlRegion(req)
1061
1062    def set_dci_format(self, dci_format):
1063        """Selects the downlink control information (DCI) format.
1064
1065        Args:
1066            dci_format: supported dci.
1067        """
1068        if not isinstance(dci_format, DciFormat):
1069            raise CmxError('Wrong type for dci_format')
1070        self._config_scheduler(dl_dci_format=dci_format.value)
1071
1072    def set_dl_channel(self, channel):
1073        """Sets the downlink channel number of cell.
1074
1075        Args:
1076            channel: downlink channel number of cell.
1077        """
1078        if self.dl_channel == channel:
1079            logger.info('The dl_channel was at {}'.format(self.dl_channel))
1080            return
1081        self._cell.set_earfcn(channel)
1082        logger.info('The dl_channel was set to {}'.format(self.dl_channel))
1083
1084    def set_dl_modulation_table(self, modulation):
1085        """Sets down link modulation table.
1086
1087        Args:
1088            modulation: modulation table setting (ModulationType).
1089        """
1090        if not isinstance(modulation, ModulationType):
1091            raise CmxError('The modulation is not the type of Modulation')
1092        self._config_scheduler(dl_mcs_table=modulation.value)
1093
1094    def set_mimo_mode(self, mimo):
1095        """Sets mimo mode for Lte scenario.
1096
1097        Args:
1098            mimo: the mimo mode.
1099        """
1100        if not isinstance(mimo, MimoModes):
1101            raise CmxError("Wrong type of mimo mode")
1102
1103        self._cell.set_num_crs_antenna_ports(mimo.value)
1104
1105    def set_scheduling_mode(self,
1106                            mcs_dl=None,
1107                            mcs_ul=None,
1108                            nrb_dl=None,
1109                            nrb_ul=None):
1110        """Sets scheduling mode.
1111
1112        Args:
1113            scheduling: the new scheduling mode.
1114            mcs_dl: Downlink MCS.
1115            mcs_ul: Uplink MCS.
1116            nrb_dl: Number of RBs for downlink.
1117            nrb_ul: Number of RBs for uplink.
1118        """
1119        self._config_scheduler(dl_mcs=mcs_dl,
1120                               ul_mcs=mcs_ul,
1121                               dl_rb_alloc=nrb_dl,
1122                               ul_rb_alloc=nrb_ul)
1123
1124    def set_ssf_config(self, ssf_config):
1125        """Sets ssf subframe assignment with tdd_config.
1126
1127        Args:
1128            ssf_config: the special subframe pattern config (from 1-9).
1129        """
1130        self.set_cell_frequency_band(ssf_cfg=ssf_config)
1131
1132    def set_tdd_config(self, tdd_config):
1133        """Sets tdd subframe assignment with tdd_config.
1134
1135        Args:
1136            tdd_config: the subframe assignemnt config (from 0-6).
1137        """
1138        self.set_cell_frequency_band(tdd_cfg=tdd_config)
1139
1140    def set_transmission_mode(self, transmission_mode):
1141        """Sets transmission mode with schedular.
1142
1143        Args:
1144            transmission_mode: the download link transmission mode.
1145        """
1146        from rs_mrt.testenvironment.signaling.sri.rat.lte import DlTransmissionMode
1147        if not isinstance(transmission_mode, TransmissionModes):
1148            raise CmxError('Wrong type of the trasmission mode')
1149        dl_tm = DlTransmissionMode(transmission_mode.value)
1150        logger.info('set dl tm to {}'.format(dl_tm))
1151        self._cc.set_dl_tm(dl_tm)
1152        self._network.apply_changes()
1153
1154    def set_ul_channel(self, channel):
1155        """Sets the up link channel number of cell.
1156
1157        Args:
1158            channel: up link channel number of cell.
1159        """
1160        if self.ul_channel == channel:
1161            logger.info('The ul_channel is at {}'.format(self.ul_channel))
1162            return
1163        self._cell.set_earfcn(channel)
1164        logger.info('The dl_channel was set to {}'.format(self.ul_channel))
1165
1166    @property
1167    def ul_channel(self):
1168        """Gets the uplink channel of cell.
1169
1170        Return:
1171            the uplink channel (earfcn) in int
1172        """
1173        return int(self._cell.get_ul_earfcn())
1174
1175    @property
1176    def ul_frequency(self):
1177        """Get the uplink frequency of the cell.
1178
1179        Return:
1180            The uplink frequency in GHz.
1181        """
1182        from mrtype.frequency import Frequency
1183        return self._cell.get_ul_earfcn().to_freq().in_units(
1184            Frequency.Units.GHz)
1185
1186    def set_ul_modulation_table(self, modulation):
1187        """Sets up link modulation table.
1188
1189        Args:
1190            modulation: modulation table setting (ModulationType).
1191        """
1192        if not isinstance(modulation, ModulationType):
1193            raise CmxError('The modulation is not the type of Modulation')
1194        if modulation == ModulationType.Q16:
1195            self._cell.stub.SetPuschCommonConfig(False)
1196        else:
1197            self._cell.stub.SetPuschCommonConfig(True)
1198
1199    def create_new_ta(self, tac):
1200        """Creates and initializes a new tracking area for this cell.
1201
1202        Args:
1203            tac: the tracking area ID of the new cell.
1204        Returns:
1205            ta: the new tracking area.
1206        """
1207        plmn = self._network.get_or_create_plmn()
1208        return plmn.create_eps_ta(tac)
1209
1210    def wait_for_attach(self, timeout):
1211        self._cmx.dut.signaling.wait_for_lte_attach(self._cell, timeout)
1212
1213    def attach_as_secondary_cell(self):
1214        """Attach this cell as a secondary cell."""
1215        self._cmx.dut.signaling.ca_add_scell(self._cell)
1216
1217
1218class NrBaseStation(BaseStation):
1219    """ NR base station."""
1220    def __init__(self, cmx, cell):
1221        """Init method to setup variables for the NR base station.
1222
1223        Args:
1224            cmx: Controller (Cmx500) object.
1225            cell: The cell for the NR base station.
1226        """
1227        from xlapi.nr_cell import NrCell
1228        if not isinstance(cell, NrCell):
1229            raise CmxError('the cell is not a NR cell, NR base station  fails'
1230                           ' to creat.')
1231
1232        super().__init__(cmx, cell)
1233
1234    def _config_scheduler(self,
1235                          dl_mcs=None,
1236                          dl_mcs_table=None,
1237                          dl_rb_alloc=None,
1238                          dl_mimo_mode=None,
1239                          ul_mcs=None,
1240                          ul_mcs_table=None,
1241                          ul_rb_alloc=None,
1242                          ul_mimo_mode=None):
1243
1244        from rs_mrt.testenvironment.signaling.sri.rat.nr import McsTable
1245
1246        log_list = []
1247        if dl_mcs:
1248            log_list.append('dl_mcs: {}'.format(dl_mcs))
1249        if ul_mcs:
1250            log_list.append('ul_mcs: {}'.format(ul_mcs))
1251
1252        # If rb alloc is not a tuple, add 0 as start RBs for XLAPI NR scheduler
1253        if dl_rb_alloc:
1254            if not isinstance(dl_rb_alloc, tuple):
1255                dl_rb_alloc = (0, dl_rb_alloc)
1256            log_list.append('dl_rb_alloc: {}'.format(dl_rb_alloc))
1257        if ul_rb_alloc:
1258            if not isinstance(ul_rb_alloc, tuple):
1259                ul_rb_alloc = (0, ul_rb_alloc)
1260            log_list.append('ul_rb_alloc: {}'.format(ul_rb_alloc))
1261        if dl_mcs_table:
1262            dl_mcs_table = McsTable(dl_mcs_table)
1263            log_list.append('dl_mcs_table: {}'.format(dl_mcs_table))
1264        if ul_mcs_table:
1265            ul_mcs_table = McsTable(ul_mcs_table)
1266            log_list.append('ul_mcs_table: {}'.format(ul_mcs_table))
1267        if dl_mimo_mode:
1268            log_list.append('dl_mimo_mode: {}'.format(dl_mimo_mode))
1269        if ul_mimo_mode:
1270            log_list.append('ul_mimo_mode: {}'.format(ul_mimo_mode))
1271
1272        scheduler = self._cmx.dut.get_scheduler(self._cell)
1273        logger.info('configure scheduler for {}'.format(','.join(log_list)))
1274
1275        scheduler.configure_ue_scheduler(dl_mcs=dl_mcs,
1276                                         dl_mcs_table=dl_mcs_table,
1277                                         dl_rb_alloc=dl_rb_alloc,
1278                                         dl_mimo_mode=dl_mimo_mode,
1279                                         ul_mcs=ul_mcs,
1280                                         ul_mcs_table=ul_mcs_table,
1281                                         ul_rb_alloc=ul_rb_alloc,
1282                                         ul_mimo_mode=ul_mimo_mode)
1283        logger.info('Configure scheduler succeeds')
1284        self._network.apply_changes()
1285
1286    def wait_for_attach(self, timeout):
1287        self._cmx.dut.signaling.wait_for_nr_registration(self._cell)
1288
1289    def attach_as_secondary_cell(self, scg=False):
1290        """Attach this cell as a secondary cell.
1291
1292        Args:
1293            scg: bool specifing if the cell should be added as part of the
1294                secondary cell group.
1295        """
1296        if scg:
1297            self._cmx.dut.signaling.ca_add_scg_scell(self._cell)
1298        else:
1299            self._cmx.dut.signaling.ca_add_scell(self._cell)
1300
1301    def activate_endc(self, endc_timer=DEFAULT_ENDC_TIMER):
1302        """Enable endc mode for NR cell.
1303
1304        Args:
1305            endc_timer: timeout for endc state
1306        """
1307        logger.info('enable endc mode for nsa dual connection')
1308        self._cmx.dut.signaling.nsa_dual_connect(self._cell)
1309        time_count = 0
1310        while time_count < endc_timer:
1311            if str(self._cmx.dut.state.radio_connectivity) == \
1312                    'RadioConnectivityMode.EPS_LTE_NR':
1313                logger.info('enter endc mode')
1314                return
1315            time.sleep(1)
1316            time_count += 1
1317            if time_count % 30 == 0:
1318                logger.info('did not reach endc at {} s'.format(time_count))
1319        raise CmxError('Cannot reach endc after {} s'.format(endc_timer))
1320
1321    def config_flexible_slots(self):
1322        """Configs flexible slots for NR cell."""
1323
1324        from rs_mrt.testenvironment.signaling.sri.rat.nr.config import CellFrequencyBandReq
1325        from rs_mrt.testenvironment.signaling.sri.rat.common import SetupRelease
1326
1327        req = CellFrequencyBandReq()
1328        req.band.frequency_range.fr1.tdd.tdd_config_common.setup_release = SetupRelease.RELEASE
1329        self._cell.stub.SetCellFrequencyBand(req)
1330        self._network.apply_changes()
1331        logger.info('Config flexible slots completed')
1332
1333    def disable_all_ul_slots(self):
1334        """Disable all uplink scheduling slots for NR cell"""
1335        self._cc.disable_all_ul_slots()
1336        self._network.apply_changes()
1337        logger.info('Disabled all uplink scheduling slots for the nr cell')
1338
1339    @property
1340    def dl_channel(self):
1341        """Gets the downlink channel of cell.
1342
1343        Return:
1344            the downlink channel (nr_arfcn) in int.
1345        """
1346        return int(self._cell.get_dl_ref_a())
1347
1348    def _bandwidth_to_carrier_bandwidth(self, bandwidth):
1349        """Converts bandwidth in MHz to CarrierBandwidth.
1350            CarrierBandwidth Enum in XLAPI:
1351                MHZ_5 = 0
1352                MHZ_10 = 1
1353                MHZ_15 = 2
1354                MHZ_20 = 3
1355                MHZ_25 = 4
1356                MHZ_30 = 5
1357                MHZ_40 = 6
1358                MHZ_50 = 7
1359                MHZ_60 = 8
1360                MHZ_70 = 9
1361                MHZ_80 = 10
1362                MHZ_90 = 11
1363                MHZ_100 = 12
1364                MHZ_200 = 13
1365                MHZ_400 = 14
1366        Args:
1367            bandwidth: channel bandwidth in MHz.
1368
1369        Return:
1370            the corresponding NR Carrier Bandwidth.
1371        """
1372        from mrtype.nr.frequency import CarrierBandwidth
1373        if bandwidth > 100:
1374            return CarrierBandwidth(12 + bandwidth // 200)
1375        elif bandwidth > 30:
1376            return CarrierBandwidth(2 + bandwidth // 10)
1377        else:
1378            return CarrierBandwidth(bandwidth // 5 - 1)
1379
1380    def set_bandwidth(self, bandwidth, scs=None):
1381        """Sets the channel bandwidth of the cell.
1382
1383        Args:
1384            bandwidth: channel bandwidth of cell.
1385            scs: subcarrier spacing (SCS) of resource grid 0
1386        """
1387        if not scs:
1388            scs = self._cell.get_scs()
1389        self._cell.set_carrier_bandwidth_and_scs(
1390            self._bandwidth_to_carrier_bandwidth(bandwidth), scs)
1391        logger.info(
1392            'The bandwidth in MHz is {}. After setting, the value is {}'.
1393            format(bandwidth, str(self._cell.get_carrier_bandwidth())))
1394
1395    def set_cdrx_config(self, config):
1396        """Configures NR cdrx with nr config parameters.
1397
1398        config: The NrCellConfig for current base station.
1399        """
1400        logger.info(
1401            f'Configure Nr drx with\n'
1402            f'drx_on_duration_timer: {config.drx_on_duration_timer}\n'
1403            f'drx_inactivity_timer: {config.drx_inactivity_timer}\n'
1404            f'drx_retransmission_timer_dl: {config.drx_retransmission_timer_dl}\n'
1405            f'drx_retransmission_timer_ul: {config.drx_retransmission_timer_ul}\n'
1406            f'drx_long_cycle: {config.drx_long_cycle}\n'
1407            f'drx_long_cycle_offset: {config.drx_long_cycle_offset}\n'
1408            f'harq_rtt_timer_dl: {config.harq_rtt_timer_dl}\n'
1409            f'harq_rtt_timer_ul: {config.harq_rtt_timer_ul}\n'
1410            f'slot_offset: {config.slot_offset}\n'
1411        )
1412
1413        from mrtype.nr.drx import (
1414            NrDrxConfig,
1415            NrDrxInactivityTimer,
1416            NrDrxOnDurationTimer,
1417            NrDrxRetransmissionTimer,
1418            NrDrxHarqRttTimer,
1419            NrDrxSlotOffset,
1420        )
1421
1422        from mrtype.nr.drx import NrDrxLongCycleStartOffset as longCycle
1423
1424        long_cycle_mapping = {
1425            10: longCycle.ms10, 20: longCycle.ms20, 32: longCycle.ms32,
1426            40: longCycle.ms40, 60: longCycle.ms60, 64: longCycle.ms64,
1427            70: longCycle.ms70, 80: longCycle.ms80, 128: longCycle.ms128,
1428            160: longCycle.ms160, 256: longCycle.ms256, 320: longCycle.ms320,
1429            512: longCycle.ms512, 640: longCycle.ms640, 1024: longCycle.ms1024,
1430            1280: longCycle.ms1280, 2048: longCycle.ms2048,
1431            2560: longCycle.ms2560,
1432        }
1433
1434        drx_on_duration_timer = NrDrxOnDurationTimer(
1435            int(config.drx_on_duration_timer)
1436        )
1437        drx_inactivity_timer = NrDrxInactivityTimer(
1438            int(config.drx_inactivity_timer)
1439        )
1440        drx_retransmission_timer_dl = NrDrxRetransmissionTimer(
1441            int(config.drx_retransmission_timer_dl)
1442        )
1443        drx_retransmission_timer_ul = NrDrxRetransmissionTimer(
1444            int(config.drx_retransmission_timer_ul)
1445        )
1446        drx_long_cycle = long_cycle_mapping[int(config.drx_long_cycle)]
1447        drx_long_cycle_offset = drx_long_cycle(
1448            int(config.drx_long_cycle_offset)
1449        )
1450        harq_rtt_timer_dl = NrDrxHarqRttTimer(config.harq_rtt_timer_dl)
1451        harq_rtt_timer_ul = NrDrxHarqRttTimer(config.harq_rtt_timer_ul)
1452        slot_offset=NrDrxSlotOffset(config.slot_offset)
1453
1454        nr_drx_config = NrDrxConfig(
1455            on_duration_timer=drx_on_duration_timer,
1456            inactivity_timer=drx_inactivity_timer,
1457            retransmission_timer_dl=drx_retransmission_timer_dl,
1458            retransmission_timer_ul=drx_retransmission_timer_ul,
1459            long_cycle_start_offset=drx_long_cycle_offset,
1460            harq_rtt_timer_dl=harq_rtt_timer_dl,
1461            harq_rtt_timer_ul=harq_rtt_timer_ul,
1462            slot_offset=slot_offset,
1463        )
1464
1465        self._cmx.dut.nr_cell_group().set_drx_and_adjust_scheduler(
1466            nr_drx_config
1467        )
1468        self._network.apply_changes()
1469
1470    def set_dl_channel(self, channel):
1471        """Sets the downlink channel number of cell.
1472
1473        Args:
1474            channel: downlink channel number of cell or Frequency Range ('LOW',
1475                     'MID' or 'HIGH').
1476        """
1477        from mrtype.nr.frequency import NrArfcn
1478        from mrtype.frequency import FrequencyRange
1479
1480        # When the channel is string, set it use Frenquency Range
1481        if isinstance(channel, str):
1482            logger.info('Sets dl channel Frequency Range {}'.format(channel))
1483            frequency_range = FrequencyRange.LOW
1484            if channel.upper() == 'MID':
1485                frequency_range = FrequencyRange.MID
1486            elif channel.upper() == 'HIGH':
1487                frequency_range = FrequencyRange.HIGH
1488            self._cell.set_dl_ref_a_offset(self.band, frequency_range)
1489            logger.info('The dl_channel was set to {}'.format(self.dl_channel))
1490            return
1491        if self.dl_channel == channel:
1492            logger.info('The dl_channel was at {}'.format(self.dl_channel))
1493            return
1494        self._cell.set_dl_ref_a_offset(self.band, NrArfcn(channel))
1495        logger.info('The dl_channel was set to {}'.format(self.dl_channel))
1496
1497    def set_dl_modulation_table(self, modulation):
1498        """Sets down link modulation table.
1499
1500        Args:
1501            modulation: modulation table setting (ModulationType).
1502        """
1503        if not isinstance(modulation, ModulationType):
1504            raise CmxError('The modulation is not the type of Modulation')
1505        self._config_scheduler(dl_mcs_table=modulation.value)
1506
1507    def set_mimo_mode(self, mimo):
1508        """Sets mimo mode for NR nsa scenario.
1509
1510        Args:
1511            mimo: the mimo mode.
1512        """
1513        from rs_mrt.testenvironment.signaling.sri.rat.nr import DownlinkMimoMode
1514        if not isinstance(mimo, MimoModes):
1515            raise CmxError("Wrong type of mimo mode")
1516
1517        self._config_scheduler(dl_mimo_mode=DownlinkMimoMode.Enum(mimo.value))
1518
1519    def set_scheduling_mode(self,
1520                            mcs_dl=None,
1521                            mcs_ul=None,
1522                            nrb_dl=None,
1523                            nrb_ul=None):
1524        """Sets scheduling mode.
1525
1526        Args:
1527            mcs_dl: Downlink MCS.
1528            mcs_ul: Uplink MCS.
1529            nrb_dl: Number of RBs for downlink.
1530            nrb_ul: Number of RBs for uplink.
1531        """
1532        self._config_scheduler(dl_mcs=mcs_dl,
1533                               ul_mcs=mcs_ul,
1534                               dl_rb_alloc=nrb_dl,
1535                               ul_rb_alloc=nrb_ul)
1536
1537    def set_ssf_config(self, ssf_config):
1538        """Sets ssf subframe assignment with tdd_config.
1539
1540        Args:
1541            ssf_config: the special subframe pattern config (from 1-9).
1542        """
1543        raise CmxError('the set ssf config for nr did not implemente yet')
1544
1545    def set_tdd_config(self, tdd_config):
1546        """Sets tdd subframe assignment with tdd_config.
1547
1548        Args:
1549            tdd_config: the subframe assignemnt config (from 0-6).
1550        """
1551        raise CmxError('the set tdd config for nr did not implemente yet')
1552
1553    def set_transmission_mode(self, transmission_mode):
1554        """Sets transmission mode with schedular.
1555
1556        Args:
1557            transmission_mode: the download link transmission mode.
1558        """
1559        logger.info('The set transmission mode for nr is set by mimo mode')
1560
1561    def set_ul_modulation_table(self, modulation):
1562        """Sets down link modulation table.
1563
1564        Args:
1565            modulation: modulation table setting (ModulationType).
1566        """
1567        if not isinstance(modulation, ModulationType):
1568            raise CmxError('The modulation is not the type of Modulation')
1569        self._config_scheduler(ul_mcs_table=modulation.value)
1570
1571    def create_new_ta(self, tac):
1572        """Creates and initializes a new tracking area for this cell.
1573
1574        Args:
1575            tac: the tracking area ID of the new cell.
1576        Returns:
1577            ta: the new tracking area.
1578        """
1579        plmn = self._network.get_or_create_plmn()
1580        return plmn.create_fivegs_ta(tac)
1581
1582
1583class CmxError(Exception):
1584    """Class to raise exceptions related to cmx."""
1585