1#!/usr/bin/env python3 2# 3# Copyright 2021 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18import time 19import sys 20 21from enum import Enum 22from os import path 23from acts.controllers import abstract_inst 24 25DEFAULT_XLAPI_PATH = '/home/mobileharness/Rohde-Schwarz/XLAPI/latest/venv/lib/python3.7/site-packages' 26DEFAULT_LTE_STATE_CHANGE_TIMER = 10 27DEFAULT_CELL_SWITCH_ON_TIMER = 60 28DEFAULT_ENDC_TIMER = 300 29 30logger = logging.getLogger('Xlapi_cmx500') 31 32LTE_CELL_PROPERTIES = [ 33 'band', 34 'bandwidth', 35 'dl_earfcn', 36 'ul_earfcn', 37 'total_dl_power', 38 'p_b', 39 'dl_epre', 40 'ref_signal_power', 41 'm', 42 'beamforming_antenna_ports', 43 'p0_nominal_pusch', 44] 45 46LTE_MHZ_UPPER_BOUND_TO_RB = [ 47 (1.5, 6), 48 (4.0, 15), 49 (7.5, 25), 50 (12.5, 50), 51 (17.5, 75), 52] 53 54 55class DciFormat(Enum): 56 """Support DCI Formats for MIMOs.""" 57 DCI_FORMAT_0 = 1 58 DCI_FORMAT_1 = 2 59 DCI_FORMAT_1A = 3 60 DCI_FORMAT_1B = 4 61 DCI_FORMAT_1C = 5 62 DCI_FORMAT_2 = 6 63 DCI_FORMAT_2A = 7 64 DCI_FORMAT_2B = 8 65 DCI_FORMAT_2C = 9 66 DCI_FORMAT_2D = 10 67 68 69class DuplexMode(Enum): 70 """Duplex Modes.""" 71 FDD = 'FDD' 72 TDD = 'TDD' 73 DL_ONLY = 'DL_ONLY' 74 75 76class LteBandwidth(Enum): 77 """Supported LTE bandwidths.""" 78 BANDWIDTH_1MHz = 6 # MHZ_1 is RB_6 79 BANDWIDTH_3MHz = 15 # MHZ_3 is RB_15 80 BANDWIDTH_5MHz = 25 # MHZ_5 is RB_25 81 BANDWIDTH_10MHz = 50 # MHZ_10 is RB_50 82 BANDWIDTH_15MHz = 75 # MHZ_15 is RB_75 83 BANDWIDTH_20MHz = 100 # MHZ_20 is RB_100 84 85 86class LteState(Enum): 87 """LTE ON and OFF.""" 88 LTE_ON = 'ON' 89 LTE_OFF = 'OFF' 90 91 92class MimoModes(Enum): 93 """MIMO Modes dl antennas.""" 94 MIMO1x1 = 1 95 MIMO2x2 = 2 96 MIMO4x4 = 4 97 98 99class ModulationType(Enum): 100 """Supported Modulation Types.""" 101 Q16 = 0 102 Q64 = 1 103 Q256 = 2 104 105 106class NasState(Enum): 107 """NAS state between callbox and dut.""" 108 DEREGISTERED = 'OFF' 109 EMM_REGISTERED = 'EMM' 110 MM5G_REGISTERED = 'NR' 111 112 113class RrcState(Enum): 114 """States to enable/disable rrc.""" 115 RRC_ON = 'ON' 116 RRC_OFF = 'OFF' 117 118 119class RrcConnectionState(Enum): 120 """RRC Connection states, describes possible DUT RRC connection states.""" 121 IDLE = 1 122 IDLE_PAGING = 2 123 IDLE_CONNECTION_ESTABLISHMENT = 3 124 CONNECTED = 4 125 CONNECTED_CONNECTION_REESTABLISHMENT = 5 126 CONNECTED_SCG_FAILURE = 6 127 CONNECTED_HANDOVER = 7 128 CONNECTED_CONNECTION_RELEASE = 8 129 130 131class SchedulingMode(Enum): 132 """Supported scheduling modes.""" 133 USERDEFINEDCH = 'UDCHannels' 134 135 136class TransmissionModes(Enum): 137 """Supported transmission modes.""" 138 TM1 = 1 139 TM2 = 2 140 TM3 = 3 141 TM4 = 4 142 TM7 = 7 143 TM8 = 8 144 TM9 = 9 145 146 147# For mimo 1x1, also set_num_crs_antenna_ports to 1 148MIMO_MAX_LAYER_MAPPING = { 149 MimoModes.MIMO1x1: 2, 150 MimoModes.MIMO2x2: 2, 151 MimoModes.MIMO4x4: 4, 152} 153 154 155class Cmx500(abstract_inst.SocketInstrument): 156 def __init__(self, ip_addr, port, xlapi_path=DEFAULT_XLAPI_PATH): 157 """Init method to setup variables for the controller. 158 159 Args: 160 ip_addr: Controller's ip address. 161 port: Port. 162 """ 163 164 # keeps the socket connection for debug purpose for now 165 super().__init__(ip_addr, port) 166 if not xlapi_path in sys.path: 167 sys.path.insert(0, xlapi_path) 168 self._initial_xlapi() 169 self._settings.system.set_instrument_address(ip_addr) 170 logger.info('The instrument address is {}'.format( 171 self._settings.system.get_instrument_address())) 172 173 self.bts = [] 174 175 # Stops all active cells if there is any 176 self.clear_network() 177 178 # initialize one lte and nr cell 179 self._add_lte_cell() 180 self._add_nr_cell() 181 182 self.lte_rrc_state_change_timer = DEFAULT_LTE_STATE_CHANGE_TIMER 183 self.rrc_state_change_time_enable = False 184 self.cell_switch_on_timer = DEFAULT_CELL_SWITCH_ON_TIMER 185 186 def set_band_combination(self, bands): 187 """ Prepares the test equipment for the indicated band/mimo combination. 188 189 Args: 190 bands: a list of bands represented as ints or strings 191 mimo_modes: a list of LteSimulation.MimoMode to use for each carrier 192 """ 193 self.clear_network() 194 195 n_lte_cells = 1 196 n_nr_cells = 1 197 for band in bands: 198 if isinstance(band, str) and band[0] == 'n': 199 nr_bts = self._add_nr_cell() 200 nr_bts.set_band(int(band[1:])) 201 # set secondary cells to dl only to avoid running out of 202 # resources in high CA scenarios 203 if n_nr_cells > 2: 204 nr_bts.set_dl_only(True) 205 n_nr_cells += 1 206 else: 207 lte_bts = self._add_lte_cell() 208 lte_bts.set_band(int(band)) 209 if n_lte_cells > 2: 210 lte_bts.set_dl_only(True) 211 n_lte_cells += 1 212 213 # turn on primary lte and/or nr cells 214 self._network.apply_changes() 215 self.turn_on_primary_cells() 216 217 def _add_nr_cell(self): 218 """Creates a new NR cell and configures antenna ports.""" 219 from mrtype.counters import N310 220 nr_cell = self._network.create_nr_cell() 221 nr_cell_max_config = nr_cell.stub.GetMaximumConfiguration() 222 nr_cell_max_config.csi_rs_antenna_ports = self.MAX_CSI_RS_PORTS 223 nr_cell.stub.SetMaximumConfiguration(nr_cell_max_config) 224 # Sets n310 timer to N310.N20 to make endc more stable 225 nr_cell.set_n310(N310.N20) 226 227 nr_bts = self.create_base_station(nr_cell) 228 self.bts.append(nr_bts) 229 return nr_bts 230 231 def _add_lte_cell(self): 232 """Creates a new LTE cell and configures antenna ports.""" 233 lte_cell = self._network.create_lte_cell() 234 lte_cell_max_config = lte_cell.stub.GetMaximumConfiguration() 235 lte_cell_max_config.csi_rs_antenna_ports = self.MAX_CSI_RS_PORTS 236 lte_cell_max_config.crs_antenna_ports = self.MAX_CRS_PORTS 237 lte_cell.stub.SetMaximumConfiguration(lte_cell_max_config) 238 lte_bts = self.create_base_station(lte_cell) 239 240 self.bts.append(lte_bts) 241 return lte_bts 242 243 def _initial_xlapi(self): 244 import xlapi 245 import mrtype 246 from xlapi import network 247 from xlapi import settings 248 from rs_mrt.testenvironment.signaling.sri.rat.common import ( 249 CsiRsAntennaPorts) 250 from rs_mrt.testenvironment.signaling.sri.rat.lte import CrsAntennaPorts 251 252 self._xlapi = xlapi 253 self._network = network 254 self._settings = settings 255 256 # initialize defaults 257 self.MAX_CSI_RS_PORTS = ( 258 CsiRsAntennaPorts.NUMBER_CSI_RS_ANTENNA_PORTS_FOUR) 259 self.MAX_CRS_PORTS = CrsAntennaPorts.NUMBER_CRS_ANTENNA_PORTS_FOUR 260 261 def configure_mimo_settings(self, mimo, bts_index=0): 262 """Sets the mimo scenario for the test. 263 264 Args: 265 mimo: mimo scenario to set. 266 """ 267 self.bts[bts_index].set_mimo_mode(mimo) 268 269 @property 270 def connection_type(self): 271 """Gets the connection type applied in callbox.""" 272 state = self.dut.state.rrc_connection_state 273 return RrcConnectionState(state.value) 274 275 def create_base_station(self, cell): 276 """Creates the base station object with cell and current object. 277 278 Args: 279 cell: the XLAPI cell. 280 281 Returns: 282 base station object. 283 Raise: 284 CmxError if the cell is neither LTE nor NR. 285 """ 286 from xlapi.lte_cell import LteCell 287 from xlapi.nr_cell import NrCell 288 if isinstance(cell, LteCell): 289 return LteBaseStation(self, cell) 290 elif isinstance(cell, NrCell): 291 return NrBaseStation(self, cell) 292 else: 293 raise CmxError('The cell type is neither LTE nor NR') 294 295 def detach(self): 296 """Detach callbox and controller.""" 297 for bts in self.bts: 298 bts.stop() 299 300 def disable_packet_switching(self): 301 """Disable packet switching in call box.""" 302 raise NotImplementedError() 303 304 def clear_network(self): 305 """Wipes the network configuration.""" 306 self.bts.clear() 307 self._network.reset() 308 # dut stub becomes stale after resetting 309 self.dut = self._network.get_dut() 310 311 def disconnect(self): 312 """Disconnect controller from device and switch to local mode.""" 313 self.clear_network() 314 self._close_socket() 315 316 def enable_packet_switching(self): 317 """Enable packet switching in call box.""" 318 raise NotImplementedError() 319 320 def get_cell_configs(self, cell): 321 """Getes cell settings. 322 323 This method is for debugging purpose. In XLAPI, there are many get 324 methods in the cell or component carrier object. When this method is 325 called, the corresponding value could be recorded with logging, which is 326 useful for debug. 327 328 Args: 329 cell: the lte or nr cell in xlapi 330 """ 331 cell_getters = [attr for attr in dir(cell) if attr.startswith('get')] 332 for attr in cell_getters: 333 try: 334 getter = getattr(cell, attr) 335 logger.info('The {} is {}'.format(attr, getter())) 336 except Exception as e: 337 logger.warning('Error in get {}: {}'.format(attr, e)) 338 339 def get_base_station(self, bts_index=0): 340 """Gets the base station object based on bts num. By default 341 bts_index set to 0 (PCC). 342 343 Args: 344 bts_num: base station identifier 345 346 Returns: 347 base station object. 348 """ 349 return self.bts[bts_index] 350 351 def get_network(self): 352 """ Gets the network object from cmx500 object.""" 353 return self._network 354 355 def init_lte_measurement(self): 356 """Gets the class object for lte measurement which can be used to 357 initiate measurements. 358 359 Returns: 360 lte measurement object. 361 """ 362 raise NotImplementedError() 363 364 def network_apply_changes(self): 365 """Uses self._network to apply changes""" 366 self._network.apply_changes() 367 368 def reset(self): 369 """System level reset.""" 370 371 self.disconnect() 372 373 @property 374 def rrc_connection(self): 375 """Gets the RRC connection state.""" 376 return self.dut.state.rrc.is_connected 377 378 def set_timer(self, timeout): 379 """Sets timer for the Cmx500 class.""" 380 self.rrc_state_change_time_enable = True 381 self.lte_rrc_state_change_timer = timeout 382 383 def switch_lte_signalling(self, state): 384 """ Turns LTE signalling ON/OFF. 385 386 Args: 387 state: an instance of LteState indicating the state to which LTE 388 signal has to be set. 389 """ 390 if not isinstance(state, LteState): 391 raise ValueError('state should be the instance of LteState.') 392 393 if self.primary_lte_cell: 394 self.set_bts_enabled(state.value == 'ON', self.primary_lte_cell) 395 else: 396 raise CmxError( 397 'Unable to set LTE signalling to {},'.format(state.value) + 398 ' no LTE cell found.') 399 400 def switch_on_nsa_signalling(self): 401 logger.info('Switches on NSA signalling') 402 403 if self.primary_lte_cell: 404 self.set_bts_enabled(True, self.primary_lte_cell) 405 else: 406 raise CmxError( 407 'Unable to turn on NSA signalling, no LTE cell found.') 408 409 if self.primary_nr_cell: 410 self.set_bts_enabled(True, self.primary_nr_cell) 411 else: 412 raise CmxError( 413 'Unable to turn on NSA signalling, no NR cell found.') 414 415 time.sleep(5) 416 417 @property 418 def primary_cell(self): 419 """Gets the primary cell in the current scenario.""" 420 # If the simulation has an active cell return it as the primary 421 # otherwise default to the first cell. 422 for cell in self.bts: 423 if cell.is_primary_cell: 424 return cell 425 426 return self.bts[0] if self.bts else None 427 428 @property 429 def primary_lte_cell(self): 430 """Gets the primary LTE cell in the current scenario. 431 432 Note: This should generally be the same as primary_cell unless we're 433 connected to a NR cell in a TA with both LTE and NR cells. 434 """ 435 cell = self.primary_cell 436 if isinstance(cell, LteBaseStation): 437 return cell 438 439 # find the first cell in the same ta as the primary cell 440 elif cell: 441 cells = self._get_cells_in_same_ta(cell) 442 return next((c for c in cells if isinstance(c, LteBaseStation)), 443 None) 444 445 return None 446 447 @property 448 def primary_nr_cell(self): 449 """Gets the primary NR cell in the current scenario. 450 451 Note: This may be the PSCell for NSA scenarios. 452 """ 453 cell = self.primary_cell 454 if cell and isinstance(cell, NrBaseStation): 455 return cell 456 457 elif cell: 458 cells = self._get_cells_in_same_ta(cell) 459 return next((c for c in cells if isinstance(c, NrBaseStation)), 460 None) 461 462 return None 463 464 @property 465 def lte_cells(self): 466 """Gets all LTE cells in the current scenario.""" 467 return list( 468 [bts for bts in self.bts if isinstance(bts, LteBaseStation)]) 469 470 @property 471 def nr_cells(self): 472 """Gets all NR cells in the current scenario.""" 473 return list( 474 [bts for bts in self.bts if isinstance(bts, NrBaseStation)]) 475 476 @property 477 def secondary_lte_cells(self): 478 """Gets a list of all LTE cells in the same TA as the primary cell.""" 479 pcell = self.primary_cell 480 if not pcell or not self.lte_cells: 481 return [] 482 483 # If NR cell is primary then there are no secondary LTE cells. 484 if not isinstance(pcell, LteBaseStation): 485 return [] 486 487 return [ 488 c for c in self._get_cells_in_same_ta(pcell) 489 if isinstance(c, LteBaseStation) 490 ] 491 492 @property 493 def secondary_nr_cells(self): 494 """Gets a list of all NR cells in the same TA as the primary cell.""" 495 pcell = self.primary_nr_cell 496 if not pcell or not self.nr_cells: 497 return [] 498 499 return [ 500 c for c in self._get_cells_in_same_ta(pcell) 501 if isinstance(c, NrBaseStation) 502 ] 503 504 @property 505 def secondary_cells(self): 506 """Gets all secondary cells in the current scenario.""" 507 return self.secondary_lte_cells + self.secondary_nr_cells 508 509 @property 510 def tracking_areas(self): 511 """Returns a list of LTE and 5G tracking areas in the simulation.""" 512 plmn = self._network.get_or_create_plmn() 513 return plmn.eps_ta_list + plmn.fivegs_ta_list 514 515 def _get_cells_in_same_ta(self, bts): 516 """Returns a list of all cells in the same TA.""" 517 tracking_area = next(t for t in self.tracking_areas 518 if bts._cell in t.cells) 519 return [ 520 c for c in self.bts if c != bts and c._cell in tracking_area.cells 521 ] 522 523 def set_bts_enabled(self, enabled, bts): 524 """Switch bts signalling on/off. 525 526 Args: 527 enabled: True/False if the signalling should be enabled. 528 bts: The bts to configure. 529 """ 530 if enabled and not bts.is_on(): 531 bts.start() 532 state = bts.wait_cell_enabled(self.cell_switch_on_timer, True) 533 if state: 534 logger.info('The cell status is on') 535 else: 536 raise CmxError('The cell cannot be switched on') 537 538 elif not enabled and bts.is_on(): 539 bts.stop() 540 state = bts.wait_cell_enabled(self.cell_switch_on_timer, False) 541 if not state: 542 logger.info('The cell status is off') 543 else: 544 raise CmxError('The cell cannot be switched off') 545 546 @property 547 def use_carrier_specific(self): 548 """Gets current status of carrier specific duplex configuration.""" 549 raise NotImplementedError() 550 551 @use_carrier_specific.setter 552 def use_carrier_specific(self, state): 553 """Sets the carrier specific duplex configuration. 554 555 Args: 556 state: ON/OFF UCS configuration. 557 """ 558 raise NotImplementedError() 559 560 def set_keep_rrc(self, keep_connected): 561 """Sets if the CMX should maintain RRC connection after registration. 562 563 Args: 564 keep_connected: true if cmx should stay in RRC_CONNECTED state or 565 false if UE preference should be used. 566 """ 567 state = 'ON' if keep_connected else 'OFF' 568 self._send('CONFigure:SIGNaling:EPS:NBEHavior:KRRC {}'.format(state)) 569 self._send('CONFigure:SIGNaling:FGS:NBEHavior:KRRC {}'.format(state)) 570 571 def turn_off_neighbor_cells(self): 572 """Turns off all cells in other tracking areas.""" 573 for cell in self.bts: 574 if not cell.is_active: 575 self.set_bts_enabled(False, cell) 576 577 def turn_on_primary_cells(self): 578 """Turns on all primary cells.""" 579 if self.primary_lte_cell: 580 self.set_bts_enabled(True, self.primary_lte_cell) 581 if self.primary_nr_cell: 582 self.set_bts_enabled(True, self.primary_nr_cell) 583 584 def turn_on_secondary_cells(self): 585 """Turns on all secondary cells.""" 586 for bts in self.secondary_cells: 587 self.set_bts_enabled(True, bts) 588 589 def handover(self, primary, secondary=None): 590 """Performs a Inter/Intra-RAT handover. 591 592 Args: 593 primary: the new primary bts. 594 secondary: the new secondary bts. 595 """ 596 self.dut.signaling.handover_to(primary.cell, 597 secondary.cell if secondary else None, 598 None) 599 600 def wait_for_rrc_state(self, state, timeout=120): 601 """ Waits until a certain RRC state is set. 602 603 Args: 604 state: the RRC state that is being waited for. 605 timeout: timeout for phone to be in connected state. 606 607 Raises: 608 CmxError on time out. 609 """ 610 is_idle = (state.value == 'OFF') 611 for idx in range(timeout): 612 time.sleep(1) 613 if self.dut.state.rrc.is_idle == is_idle: 614 logger.info('{} reached at {} s'.format(state.value, idx)) 615 return True 616 error_message = 'Waiting for {} state timeout after {}'.format( 617 state.value, timeout) 618 logger.error(error_message) 619 raise CmxError(error_message) 620 621 def wait_until_attached(self, timeout=120): 622 """Waits until Lte attached. 623 624 Args: 625 timeout: timeout for phone to get attached. 626 627 Raises: 628 CmxError on time out. 629 """ 630 if not self.bts: 631 raise CmxError('cannot attach device, no cells found') 632 633 self.primary_cell.wait_for_attach(timeout) 634 635 def send_sms(self, message): 636 """ Sends an SMS message to the DUT. 637 638 Args: 639 message: the SMS message to send. 640 """ 641 self.dut.signaling.mt_sms(message) 642 643 644class BaseStation(object): 645 """Class to interact with different the base stations.""" 646 def __init__(self, cmx, cell): 647 """Init method to setup variables for base station. 648 649 Args: 650 cmx: Controller (Cmx500) object. 651 cell: The cell for the base station. 652 """ 653 654 self._cell = cell 655 self._cmx = cmx 656 self._cc = cmx.dut.cc(cell) 657 self._network = cmx.get_network() 658 659 @property 660 def band(self): 661 """Gets the current band of cell. 662 663 Return: 664 the band number in int. 665 """ 666 cell_band = self._cell.get_band() 667 return int(cell_band) 668 669 @property 670 def dl_power(self): 671 """Gets RSPRE level. 672 673 Return: 674 the power level in dbm. 675 """ 676 return self._cell.get_total_dl_power().in_dBm() 677 678 @property 679 def duplex_mode(self): 680 """Gets current duplex of cell.""" 681 band = self._cell.get_band() 682 if band.is_fdd(): 683 return DuplexMode.FDD 684 if band.is_tdd(): 685 return DuplexMode.TDD 686 if band.is_dl_only(): 687 return DuplexMode.DL_ONLY 688 689 def get_cc(self): 690 """Gets component carrier of the cell.""" 691 return self._cc 692 693 def is_on(self): 694 """Verifies if the cell is turned on. 695 696 Return: 697 boolean (if the cell is on). 698 """ 699 return self._cell.is_on() 700 701 def set_band(self, band): 702 """Sets the Band of cell. 703 704 Args: 705 band: band of cell. 706 """ 707 self._cell.set_band(band) 708 logger.info('The band is set to {} and is {} after setting'.format( 709 band, self.band)) 710 711 def set_dl_mac_padding(self, state): 712 """Enables/Disables downlink padding at the mac layer. 713 714 Args: 715 state: a boolean 716 """ 717 self._cc.set_dl_mac_padding(state) 718 719 def set_dl_power(self, pwlevel): 720 """Modifies RSPRE level. 721 722 Args: 723 pwlevel: power level in dBm. 724 """ 725 self._cell.set_total_dl_power(pwlevel) 726 727 def set_ul_power(self, ul_power): 728 """Sets ul power 729 730 Args: 731 ul_power: the uplink power in dbm 732 """ 733 self._cc.set_target_ul_power(ul_power) 734 735 def set_dl_only(self, dl_only): 736 """Sets if the cell should be in downlink only mode. 737 738 Args: 739 dl_only: bool indicating if the cell should be downlink only. 740 """ 741 self._cell.dl_only = dl_only 742 743 def start(self): 744 """Starts the cell.""" 745 self._cell.start() 746 747 def stop(self): 748 """Stops the cell.""" 749 self._cell.stop() 750 751 def wait_cell_enabled(self, timeout, enabled): 752 """Waits for the requested cell state. 753 754 Args: 755 timeout: the time for waiting the cell on. 756 enabled: true if the cell should be enabled. 757 758 Raises: 759 CmxError on time out. 760 """ 761 waiting_time = 0 762 while waiting_time < timeout: 763 if self._cell.is_on() == enabled: 764 return enabled 765 waiting_time += 1 766 time.sleep(1) 767 return self._cell.is_on() 768 769 def set_tracking_area(self, tac): 770 """Sets the cells tracking area. 771 772 Args: 773 tac: the tracking area ID to add the cell to. 774 """ 775 plmn = self._network.get_or_create_plmn() 776 old_ta = next((t for t in (plmn.eps_ta_list + plmn.fivegs_ta_list) 777 if self._cell in t.cells)) 778 new_ta = next((t for t in (plmn.eps_ta_list + plmn.fivegs_ta_list) 779 if t.tac == tac), None) 780 if not new_ta: 781 new_ta = self.create_new_ta(tac) 782 783 if old_ta == new_ta: 784 return 785 786 new_ta.add_cell(self._cell) 787 old_ta.remove_cell(self._cell) 788 789 @property 790 def is_primary_cell(self): 791 """Returns true if the cell is the current primary cell.""" 792 return self._cell == self._cmx.dut.state.pcell 793 794 @property 795 def is_active(self): 796 """Returns true if the cell is part of the current simulation. 797 798 A cell is considered "active" if it is the primary cell 799 or is in the same TA as the primary cell, i.e. not a neighbor cell. 800 """ 801 return (self == self._cmx.primary_lte_cell 802 or self == self._cmx.primary_nr_cell 803 or self in self._cmx.secondary_cells) 804 805 @property 806 def cell(self): 807 """Returns the underlying XLAPI cell object.""" 808 return self._cell 809 810 811class LteBaseStation(BaseStation): 812 """ LTE base station.""" 813 def __init__(self, cmx, cell): 814 """Init method to setup variables for the LTE base station. 815 816 Args: 817 cmx: Controller (Cmx500) object. 818 cell: The cell for the LTE base station. 819 """ 820 from xlapi.lte_cell import LteCell 821 if not isinstance(cell, LteCell): 822 raise CmxError( 823 'The cell is not a LTE cell, LTE base station fails' 824 ' to create.') 825 super().__init__(cmx, cell) 826 827 def _config_scheduler(self, 828 dl_mcs=None, 829 dl_rb_alloc=None, 830 dl_dci_ncce=None, 831 dl_dci_format=None, 832 dl_tm=None, 833 dl_num_layers=None, 834 dl_mcs_table=None, 835 ul_mcs=None, 836 ul_rb_alloc=None, 837 ul_dci_ncce=None): 838 839 from rs_mrt.testenvironment.signaling.sri.rat.lte import DciFormat 840 from rs_mrt.testenvironment.signaling.sri.rat.lte import DlTransmissionMode 841 from rs_mrt.testenvironment.signaling.sri.rat.lte import MaxLayersMIMO 842 from rs_mrt.testenvironment.signaling.sri.rat.lte import McsTable 843 from rs_mrt.testenvironment.signaling.sri.rat.lte import PdcchFormat 844 845 log_list = [] 846 if dl_mcs: 847 log_list.append('dl_mcs: {}'.format(dl_mcs)) 848 if ul_mcs: 849 log_list.append('ul_mcs: {}'.format(ul_mcs)) 850 if dl_rb_alloc: 851 if not isinstance(dl_rb_alloc, tuple): 852 dl_rb_alloc = (0, dl_rb_alloc) 853 log_list.append('dl_rb_alloc: {}'.format(dl_rb_alloc)) 854 if ul_rb_alloc: 855 if not isinstance(ul_rb_alloc, tuple): 856 ul_rb_alloc = (0, ul_rb_alloc) 857 log_list.append('ul_rb_alloc: {}'.format(ul_rb_alloc)) 858 if dl_dci_ncce: 859 dl_dci_ncce = PdcchFormat(dl_dci_ncce) 860 log_list.append('dl_dci_ncce: {}'.format(dl_dci_ncce)) 861 if ul_dci_ncce: 862 ul_dci_ncce = PdcchFormat(ul_dci_ncce) 863 log_list.append('ul_dci_ncce: {}'.format(ul_dci_ncce)) 864 if dl_dci_format: 865 dl_dci_format = DciFormat(dl_dci_format) 866 log_list.append('dl_dci_format: {}'.format(dl_dci_format)) 867 if dl_tm: 868 dl_tm = DlTransmissionMode(dl_tm.value) 869 log_list.append('dl_tm: {}'.format(dl_tm)) 870 if dl_num_layers: 871 dl_num_layers = MaxLayersMIMO(dl_num_layers) 872 log_list.append('dl_num_layers: {}'.format(dl_num_layers)) 873 if dl_mcs_table: 874 dl_mcs_table = McsTable(dl_mcs_table) 875 log_list.append('dl_mcs_table: {}'.format(dl_mcs_table)) 876 877 num_crs_antenna_ports = self._cell.get_num_crs_antenna_ports() 878 879 # Sets num of crs antenna ports to 4 for configuring 880 self._cell.set_num_crs_antenna_ports(4) 881 scheduler = self._cmx.dut.get_scheduler(self._cell) 882 logger.info('configure scheduler for {}'.format(','.join(log_list))) 883 scheduler.configure_scheduler(dl_mcs=dl_mcs, 884 dl_rb_alloc=dl_rb_alloc, 885 dl_dci_ncce=dl_dci_ncce, 886 dl_dci_format=dl_dci_format, 887 dl_tm=dl_tm, 888 dl_num_layers=dl_num_layers, 889 dl_mcs_table=dl_mcs_table, 890 ul_mcs=ul_mcs, 891 ul_rb_alloc=ul_rb_alloc, 892 ul_dci_ncce=ul_dci_ncce) 893 logger.info('Configure scheduler succeeds') 894 895 # Sets num of crs antenna ports back to previous value 896 self._cell.set_num_crs_antenna_ports(num_crs_antenna_ports) 897 self._network.apply_changes() 898 899 @property 900 def bandwidth(self): 901 """Get the channel bandwidth of the cell. 902 903 Return: 904 the number rb of the bandwidth. 905 """ 906 return self._cell.get_bandwidth().num_rb 907 908 @property 909 def dl_channel(self): 910 """Gets the downlink channel of cell. 911 912 Return: 913 the downlink channel (earfcn) in int. 914 """ 915 return int(self._cell.get_dl_earfcn()) 916 917 @property 918 def dl_frequency(self): 919 """Get the downlink frequency of the cell.""" 920 from mrtype.frequency import Frequency 921 return self._cell.get_dl_earfcn().to_freq().in_units( 922 Frequency.Units.GHz) 923 924 def _to_rb_bandwidth(self, bandwidth): 925 for idx in range(5): 926 if bandwidth < LTE_MHZ_UPPER_BOUND_TO_RB[idx][0]: 927 return LTE_MHZ_UPPER_BOUND_TO_RB[idx][1] 928 return 100 929 930 def disable_all_ul_subframes(self): 931 """Disables all ul subframes for LTE cell.""" 932 self._cc.disable_all_ul_subframes() 933 self._network.apply_changes() 934 logger.info('lte cell disable all ul subframes completed') 935 936 def set_bandwidth(self, bandwidth): 937 """Sets the channel bandwidth of the cell. 938 939 Args: 940 bandwidth: channel bandwidth of cell in MHz. 941 """ 942 self._cell.set_bandwidth(self._to_rb_bandwidth(bandwidth)) 943 self._network.apply_changes() 944 945 def set_cdrx_config(self, config): 946 """Configures LTE cdrx with lte config parameters. 947 948 config: The LteCellConfig for current base station. 949 """ 950 951 logger.info( 952 f'Configure Lte drx with\n' 953 f'drx_on_duration_timer: {config.drx_on_duration_timer}\n' 954 f'drx_inactivity_timer: {config.drx_inactivity_timer}\n' 955 f'drx_retransmission_timer: {config.drx_retransmission_timer}\n' 956 f'drx_long_cycle: {config.drx_long_cycle}\n' 957 f'drx_long_cycle_offset: {config.drx_long_cycle_offset}' 958 ) 959 960 from mrtype.lte.drx import ( 961 LteDrxConfig, 962 LteDrxInactivityTimer, 963 LteDrxOnDurationTimer, 964 LteDrxRetransmissionTimer, 965 ) 966 967 from mrtype.lte.drx import LteDrxLongCycleStartOffset as longCycle 968 969 long_cycle_mapping = { 970 10: longCycle.ms10, 20: longCycle.ms20, 32: longCycle.ms32, 971 40: longCycle.ms40, 60: longCycle.ms60, 64: longCycle.ms64, 972 70: longCycle.ms70, 80: longCycle.ms80, 128: longCycle.ms128, 973 160: longCycle.ms160, 256: longCycle.ms256, 320: longCycle.ms320, 974 512: longCycle.ms512, 640: longCycle.ms640, 1280: longCycle.ms1280, 975 2048: longCycle.ms2048, 2560: longCycle.ms2560 976 } 977 978 drx_on_duration_timer = LteDrxOnDurationTimer( 979 int(config.drx_on_duration_timer) 980 ) 981 drx_inactivity_timer = LteDrxInactivityTimer( 982 int(config.drx_inactivity_timer) 983 ) 984 drx_retransmission_timer = LteDrxRetransmissionTimer( 985 int(config.drx_retransmission_timer) 986 ) 987 drx_long_cycle = long_cycle_mapping[int(config.drx_long_cycle)] 988 drx_long_cycle_offset = drx_long_cycle(config.drx_long_cycle_offset) 989 990 lte_drx_config = LteDrxConfig( 991 on_duration_timer=drx_on_duration_timer, 992 inactivity_timer=drx_inactivity_timer, 993 retransmission_timer=drx_retransmission_timer, 994 long_cycle_start_offset=drx_long_cycle_offset, 995 short_drx=None 996 ) 997 998 self._cmx.dut.lte_cell_group().set_drx_and_adjust_scheduler( 999 drx_config=lte_drx_config 1000 ) 1001 self._network.apply_changes() 1002 1003 def set_default_cdrx_config(self): 1004 """Sets default LTE cdrx config for endc (for legacy code).""" 1005 from mrtype.lte.drx import ( 1006 LteDrxConfig, 1007 LteDrxInactivityTimer, 1008 LteDrxLongCycleStartOffset, 1009 LteDrxOnDurationTimer, 1010 LteDrxRetransmissionTimer, 1011 ) 1012 1013 logger.info('Config Lte drx config') 1014 lte_drx_config = LteDrxConfig( 1015 on_duration_timer=LteDrxOnDurationTimer.PSF_10, 1016 inactivity_timer=LteDrxInactivityTimer.PSF_200, 1017 retransmission_timer=LteDrxRetransmissionTimer.PSF_33, 1018 long_cycle_start_offset=LteDrxLongCycleStartOffset.ms160(83), 1019 short_drx=None) 1020 self._cmx.dut.lte_cell_group().set_drx_and_adjust_scheduler( 1021 drx_config=lte_drx_config) 1022 self._network.apply_changes() 1023 1024 def set_cell_frequency_band(self, tdd_cfg=None, ssf_cfg=None): 1025 """Sets cell frequency band with tdd and ssf config. 1026 1027 Args: 1028 tdd_cfg: the tdd subframe assignment config in number (from 0-6). 1029 ssf_cfg: the special subframe pattern config in number (from 1-9). 1030 """ 1031 from rs_mrt.testenvironment.signaling.sri.rat.lte import SpecialSubframePattern 1032 from rs_mrt.testenvironment.signaling.sri.rat.lte import SubFrameAssignment 1033 from rs_mrt.testenvironment.signaling.sri.rat.lte.config import CellFrequencyBand 1034 from rs_mrt.testenvironment.signaling.sri.rat.lte.config import Tdd 1035 tdd_subframe = None 1036 ssf_pattern = None 1037 if tdd_cfg: 1038 tdd_subframe = SubFrameAssignment(tdd_cfg + 1) 1039 if ssf_cfg: 1040 ssf_pattern = SpecialSubframePattern(ssf_cfg) 1041 tdd = Tdd(tdd_config=Tdd.TddConfigSignaling( 1042 subframe_assignment=tdd_subframe, 1043 special_subframe_pattern=ssf_pattern)) 1044 self._cell.stub.SetCellFrequencyBand(CellFrequencyBand(tdd=tdd)) 1045 self._network.apply_changes() 1046 1047 def set_cfi(self, cfi): 1048 """Sets number of pdcch symbols (cfi). 1049 1050 Args: 1051 cfi: the value of NumberOfPdcchSymbols 1052 """ 1053 from rs_mrt.testenvironment.signaling.sri.rat.lte import NumberOfPdcchSymbols 1054 from rs_mrt.testenvironment.signaling.sri.rat.lte.config import PdcchRegionReq 1055 1056 logger.info('The cfi enum to set is {}'.format( 1057 NumberOfPdcchSymbols(cfi))) 1058 req = PdcchRegionReq() 1059 req.num_pdcch_symbols = NumberOfPdcchSymbols(cfi) 1060 self._cell.stub.SetPdcchControlRegion(req) 1061 1062 def set_dci_format(self, dci_format): 1063 """Selects the downlink control information (DCI) format. 1064 1065 Args: 1066 dci_format: supported dci. 1067 """ 1068 if not isinstance(dci_format, DciFormat): 1069 raise CmxError('Wrong type for dci_format') 1070 self._config_scheduler(dl_dci_format=dci_format.value) 1071 1072 def set_dl_channel(self, channel): 1073 """Sets the downlink channel number of cell. 1074 1075 Args: 1076 channel: downlink channel number of cell. 1077 """ 1078 if self.dl_channel == channel: 1079 logger.info('The dl_channel was at {}'.format(self.dl_channel)) 1080 return 1081 self._cell.set_earfcn(channel) 1082 logger.info('The dl_channel was set to {}'.format(self.dl_channel)) 1083 1084 def set_dl_modulation_table(self, modulation): 1085 """Sets down link modulation table. 1086 1087 Args: 1088 modulation: modulation table setting (ModulationType). 1089 """ 1090 if not isinstance(modulation, ModulationType): 1091 raise CmxError('The modulation is not the type of Modulation') 1092 self._config_scheduler(dl_mcs_table=modulation.value) 1093 1094 def set_mimo_mode(self, mimo): 1095 """Sets mimo mode for Lte scenario. 1096 1097 Args: 1098 mimo: the mimo mode. 1099 """ 1100 if not isinstance(mimo, MimoModes): 1101 raise CmxError("Wrong type of mimo mode") 1102 1103 self._cell.set_num_crs_antenna_ports(mimo.value) 1104 1105 def set_scheduling_mode(self, 1106 mcs_dl=None, 1107 mcs_ul=None, 1108 nrb_dl=None, 1109 nrb_ul=None): 1110 """Sets scheduling mode. 1111 1112 Args: 1113 scheduling: the new scheduling mode. 1114 mcs_dl: Downlink MCS. 1115 mcs_ul: Uplink MCS. 1116 nrb_dl: Number of RBs for downlink. 1117 nrb_ul: Number of RBs for uplink. 1118 """ 1119 self._config_scheduler(dl_mcs=mcs_dl, 1120 ul_mcs=mcs_ul, 1121 dl_rb_alloc=nrb_dl, 1122 ul_rb_alloc=nrb_ul) 1123 1124 def set_ssf_config(self, ssf_config): 1125 """Sets ssf subframe assignment with tdd_config. 1126 1127 Args: 1128 ssf_config: the special subframe pattern config (from 1-9). 1129 """ 1130 self.set_cell_frequency_band(ssf_cfg=ssf_config) 1131 1132 def set_tdd_config(self, tdd_config): 1133 """Sets tdd subframe assignment with tdd_config. 1134 1135 Args: 1136 tdd_config: the subframe assignemnt config (from 0-6). 1137 """ 1138 self.set_cell_frequency_band(tdd_cfg=tdd_config) 1139 1140 def set_transmission_mode(self, transmission_mode): 1141 """Sets transmission mode with schedular. 1142 1143 Args: 1144 transmission_mode: the download link transmission mode. 1145 """ 1146 from rs_mrt.testenvironment.signaling.sri.rat.lte import DlTransmissionMode 1147 if not isinstance(transmission_mode, TransmissionModes): 1148 raise CmxError('Wrong type of the trasmission mode') 1149 dl_tm = DlTransmissionMode(transmission_mode.value) 1150 logger.info('set dl tm to {}'.format(dl_tm)) 1151 self._cc.set_dl_tm(dl_tm) 1152 self._network.apply_changes() 1153 1154 def set_ul_channel(self, channel): 1155 """Sets the up link channel number of cell. 1156 1157 Args: 1158 channel: up link channel number of cell. 1159 """ 1160 if self.ul_channel == channel: 1161 logger.info('The ul_channel is at {}'.format(self.ul_channel)) 1162 return 1163 self._cell.set_earfcn(channel) 1164 logger.info('The dl_channel was set to {}'.format(self.ul_channel)) 1165 1166 @property 1167 def ul_channel(self): 1168 """Gets the uplink channel of cell. 1169 1170 Return: 1171 the uplink channel (earfcn) in int 1172 """ 1173 return int(self._cell.get_ul_earfcn()) 1174 1175 @property 1176 def ul_frequency(self): 1177 """Get the uplink frequency of the cell. 1178 1179 Return: 1180 The uplink frequency in GHz. 1181 """ 1182 from mrtype.frequency import Frequency 1183 return self._cell.get_ul_earfcn().to_freq().in_units( 1184 Frequency.Units.GHz) 1185 1186 def set_ul_modulation_table(self, modulation): 1187 """Sets up link modulation table. 1188 1189 Args: 1190 modulation: modulation table setting (ModulationType). 1191 """ 1192 if not isinstance(modulation, ModulationType): 1193 raise CmxError('The modulation is not the type of Modulation') 1194 if modulation == ModulationType.Q16: 1195 self._cell.stub.SetPuschCommonConfig(False) 1196 else: 1197 self._cell.stub.SetPuschCommonConfig(True) 1198 1199 def create_new_ta(self, tac): 1200 """Creates and initializes a new tracking area for this cell. 1201 1202 Args: 1203 tac: the tracking area ID of the new cell. 1204 Returns: 1205 ta: the new tracking area. 1206 """ 1207 plmn = self._network.get_or_create_plmn() 1208 return plmn.create_eps_ta(tac) 1209 1210 def wait_for_attach(self, timeout): 1211 self._cmx.dut.signaling.wait_for_lte_attach(self._cell, timeout) 1212 1213 def attach_as_secondary_cell(self): 1214 """Attach this cell as a secondary cell.""" 1215 self._cmx.dut.signaling.ca_add_scell(self._cell) 1216 1217 1218class NrBaseStation(BaseStation): 1219 """ NR base station.""" 1220 def __init__(self, cmx, cell): 1221 """Init method to setup variables for the NR base station. 1222 1223 Args: 1224 cmx: Controller (Cmx500) object. 1225 cell: The cell for the NR base station. 1226 """ 1227 from xlapi.nr_cell import NrCell 1228 if not isinstance(cell, NrCell): 1229 raise CmxError('the cell is not a NR cell, NR base station fails' 1230 ' to creat.') 1231 1232 super().__init__(cmx, cell) 1233 1234 def _config_scheduler(self, 1235 dl_mcs=None, 1236 dl_mcs_table=None, 1237 dl_rb_alloc=None, 1238 dl_mimo_mode=None, 1239 ul_mcs=None, 1240 ul_mcs_table=None, 1241 ul_rb_alloc=None, 1242 ul_mimo_mode=None): 1243 1244 from rs_mrt.testenvironment.signaling.sri.rat.nr import McsTable 1245 1246 log_list = [] 1247 if dl_mcs: 1248 log_list.append('dl_mcs: {}'.format(dl_mcs)) 1249 if ul_mcs: 1250 log_list.append('ul_mcs: {}'.format(ul_mcs)) 1251 1252 # If rb alloc is not a tuple, add 0 as start RBs for XLAPI NR scheduler 1253 if dl_rb_alloc: 1254 if not isinstance(dl_rb_alloc, tuple): 1255 dl_rb_alloc = (0, dl_rb_alloc) 1256 log_list.append('dl_rb_alloc: {}'.format(dl_rb_alloc)) 1257 if ul_rb_alloc: 1258 if not isinstance(ul_rb_alloc, tuple): 1259 ul_rb_alloc = (0, ul_rb_alloc) 1260 log_list.append('ul_rb_alloc: {}'.format(ul_rb_alloc)) 1261 if dl_mcs_table: 1262 dl_mcs_table = McsTable(dl_mcs_table) 1263 log_list.append('dl_mcs_table: {}'.format(dl_mcs_table)) 1264 if ul_mcs_table: 1265 ul_mcs_table = McsTable(ul_mcs_table) 1266 log_list.append('ul_mcs_table: {}'.format(ul_mcs_table)) 1267 if dl_mimo_mode: 1268 log_list.append('dl_mimo_mode: {}'.format(dl_mimo_mode)) 1269 if ul_mimo_mode: 1270 log_list.append('ul_mimo_mode: {}'.format(ul_mimo_mode)) 1271 1272 scheduler = self._cmx.dut.get_scheduler(self._cell) 1273 logger.info('configure scheduler for {}'.format(','.join(log_list))) 1274 1275 scheduler.configure_ue_scheduler(dl_mcs=dl_mcs, 1276 dl_mcs_table=dl_mcs_table, 1277 dl_rb_alloc=dl_rb_alloc, 1278 dl_mimo_mode=dl_mimo_mode, 1279 ul_mcs=ul_mcs, 1280 ul_mcs_table=ul_mcs_table, 1281 ul_rb_alloc=ul_rb_alloc, 1282 ul_mimo_mode=ul_mimo_mode) 1283 logger.info('Configure scheduler succeeds') 1284 self._network.apply_changes() 1285 1286 def wait_for_attach(self, timeout): 1287 self._cmx.dut.signaling.wait_for_nr_registration(self._cell) 1288 1289 def attach_as_secondary_cell(self, scg=False): 1290 """Attach this cell as a secondary cell. 1291 1292 Args: 1293 scg: bool specifing if the cell should be added as part of the 1294 secondary cell group. 1295 """ 1296 if scg: 1297 self._cmx.dut.signaling.ca_add_scg_scell(self._cell) 1298 else: 1299 self._cmx.dut.signaling.ca_add_scell(self._cell) 1300 1301 def activate_endc(self, endc_timer=DEFAULT_ENDC_TIMER): 1302 """Enable endc mode for NR cell. 1303 1304 Args: 1305 endc_timer: timeout for endc state 1306 """ 1307 logger.info('enable endc mode for nsa dual connection') 1308 self._cmx.dut.signaling.nsa_dual_connect(self._cell) 1309 time_count = 0 1310 while time_count < endc_timer: 1311 if str(self._cmx.dut.state.radio_connectivity) == \ 1312 'RadioConnectivityMode.EPS_LTE_NR': 1313 logger.info('enter endc mode') 1314 return 1315 time.sleep(1) 1316 time_count += 1 1317 if time_count % 30 == 0: 1318 logger.info('did not reach endc at {} s'.format(time_count)) 1319 raise CmxError('Cannot reach endc after {} s'.format(endc_timer)) 1320 1321 def config_flexible_slots(self): 1322 """Configs flexible slots for NR cell.""" 1323 1324 from rs_mrt.testenvironment.signaling.sri.rat.nr.config import CellFrequencyBandReq 1325 from rs_mrt.testenvironment.signaling.sri.rat.common import SetupRelease 1326 1327 req = CellFrequencyBandReq() 1328 req.band.frequency_range.fr1.tdd.tdd_config_common.setup_release = SetupRelease.RELEASE 1329 self._cell.stub.SetCellFrequencyBand(req) 1330 self._network.apply_changes() 1331 logger.info('Config flexible slots completed') 1332 1333 def disable_all_ul_slots(self): 1334 """Disable all uplink scheduling slots for NR cell""" 1335 self._cc.disable_all_ul_slots() 1336 self._network.apply_changes() 1337 logger.info('Disabled all uplink scheduling slots for the nr cell') 1338 1339 @property 1340 def dl_channel(self): 1341 """Gets the downlink channel of cell. 1342 1343 Return: 1344 the downlink channel (nr_arfcn) in int. 1345 """ 1346 return int(self._cell.get_dl_ref_a()) 1347 1348 def _bandwidth_to_carrier_bandwidth(self, bandwidth): 1349 """Converts bandwidth in MHz to CarrierBandwidth. 1350 CarrierBandwidth Enum in XLAPI: 1351 MHZ_5 = 0 1352 MHZ_10 = 1 1353 MHZ_15 = 2 1354 MHZ_20 = 3 1355 MHZ_25 = 4 1356 MHZ_30 = 5 1357 MHZ_40 = 6 1358 MHZ_50 = 7 1359 MHZ_60 = 8 1360 MHZ_70 = 9 1361 MHZ_80 = 10 1362 MHZ_90 = 11 1363 MHZ_100 = 12 1364 MHZ_200 = 13 1365 MHZ_400 = 14 1366 Args: 1367 bandwidth: channel bandwidth in MHz. 1368 1369 Return: 1370 the corresponding NR Carrier Bandwidth. 1371 """ 1372 from mrtype.nr.frequency import CarrierBandwidth 1373 if bandwidth > 100: 1374 return CarrierBandwidth(12 + bandwidth // 200) 1375 elif bandwidth > 30: 1376 return CarrierBandwidth(2 + bandwidth // 10) 1377 else: 1378 return CarrierBandwidth(bandwidth // 5 - 1) 1379 1380 def set_bandwidth(self, bandwidth, scs=None): 1381 """Sets the channel bandwidth of the cell. 1382 1383 Args: 1384 bandwidth: channel bandwidth of cell. 1385 scs: subcarrier spacing (SCS) of resource grid 0 1386 """ 1387 if not scs: 1388 scs = self._cell.get_scs() 1389 self._cell.set_carrier_bandwidth_and_scs( 1390 self._bandwidth_to_carrier_bandwidth(bandwidth), scs) 1391 logger.info( 1392 'The bandwidth in MHz is {}. After setting, the value is {}'. 1393 format(bandwidth, str(self._cell.get_carrier_bandwidth()))) 1394 1395 def set_cdrx_config(self, config): 1396 """Configures NR cdrx with nr config parameters. 1397 1398 config: The NrCellConfig for current base station. 1399 """ 1400 logger.info( 1401 f'Configure Nr drx with\n' 1402 f'drx_on_duration_timer: {config.drx_on_duration_timer}\n' 1403 f'drx_inactivity_timer: {config.drx_inactivity_timer}\n' 1404 f'drx_retransmission_timer_dl: {config.drx_retransmission_timer_dl}\n' 1405 f'drx_retransmission_timer_ul: {config.drx_retransmission_timer_ul}\n' 1406 f'drx_long_cycle: {config.drx_long_cycle}\n' 1407 f'drx_long_cycle_offset: {config.drx_long_cycle_offset}\n' 1408 f'harq_rtt_timer_dl: {config.harq_rtt_timer_dl}\n' 1409 f'harq_rtt_timer_ul: {config.harq_rtt_timer_ul}\n' 1410 f'slot_offset: {config.slot_offset}\n' 1411 ) 1412 1413 from mrtype.nr.drx import ( 1414 NrDrxConfig, 1415 NrDrxInactivityTimer, 1416 NrDrxOnDurationTimer, 1417 NrDrxRetransmissionTimer, 1418 NrDrxHarqRttTimer, 1419 NrDrxSlotOffset, 1420 ) 1421 1422 from mrtype.nr.drx import NrDrxLongCycleStartOffset as longCycle 1423 1424 long_cycle_mapping = { 1425 10: longCycle.ms10, 20: longCycle.ms20, 32: longCycle.ms32, 1426 40: longCycle.ms40, 60: longCycle.ms60, 64: longCycle.ms64, 1427 70: longCycle.ms70, 80: longCycle.ms80, 128: longCycle.ms128, 1428 160: longCycle.ms160, 256: longCycle.ms256, 320: longCycle.ms320, 1429 512: longCycle.ms512, 640: longCycle.ms640, 1024: longCycle.ms1024, 1430 1280: longCycle.ms1280, 2048: longCycle.ms2048, 1431 2560: longCycle.ms2560, 1432 } 1433 1434 drx_on_duration_timer = NrDrxOnDurationTimer( 1435 int(config.drx_on_duration_timer) 1436 ) 1437 drx_inactivity_timer = NrDrxInactivityTimer( 1438 int(config.drx_inactivity_timer) 1439 ) 1440 drx_retransmission_timer_dl = NrDrxRetransmissionTimer( 1441 int(config.drx_retransmission_timer_dl) 1442 ) 1443 drx_retransmission_timer_ul = NrDrxRetransmissionTimer( 1444 int(config.drx_retransmission_timer_ul) 1445 ) 1446 drx_long_cycle = long_cycle_mapping[int(config.drx_long_cycle)] 1447 drx_long_cycle_offset = drx_long_cycle( 1448 int(config.drx_long_cycle_offset) 1449 ) 1450 harq_rtt_timer_dl = NrDrxHarqRttTimer(config.harq_rtt_timer_dl) 1451 harq_rtt_timer_ul = NrDrxHarqRttTimer(config.harq_rtt_timer_ul) 1452 slot_offset=NrDrxSlotOffset(config.slot_offset) 1453 1454 nr_drx_config = NrDrxConfig( 1455 on_duration_timer=drx_on_duration_timer, 1456 inactivity_timer=drx_inactivity_timer, 1457 retransmission_timer_dl=drx_retransmission_timer_dl, 1458 retransmission_timer_ul=drx_retransmission_timer_ul, 1459 long_cycle_start_offset=drx_long_cycle_offset, 1460 harq_rtt_timer_dl=harq_rtt_timer_dl, 1461 harq_rtt_timer_ul=harq_rtt_timer_ul, 1462 slot_offset=slot_offset, 1463 ) 1464 1465 self._cmx.dut.nr_cell_group().set_drx_and_adjust_scheduler( 1466 nr_drx_config 1467 ) 1468 self._network.apply_changes() 1469 1470 def set_dl_channel(self, channel): 1471 """Sets the downlink channel number of cell. 1472 1473 Args: 1474 channel: downlink channel number of cell or Frequency Range ('LOW', 1475 'MID' or 'HIGH'). 1476 """ 1477 from mrtype.nr.frequency import NrArfcn 1478 from mrtype.frequency import FrequencyRange 1479 1480 # When the channel is string, set it use Frenquency Range 1481 if isinstance(channel, str): 1482 logger.info('Sets dl channel Frequency Range {}'.format(channel)) 1483 frequency_range = FrequencyRange.LOW 1484 if channel.upper() == 'MID': 1485 frequency_range = FrequencyRange.MID 1486 elif channel.upper() == 'HIGH': 1487 frequency_range = FrequencyRange.HIGH 1488 self._cell.set_dl_ref_a_offset(self.band, frequency_range) 1489 logger.info('The dl_channel was set to {}'.format(self.dl_channel)) 1490 return 1491 if self.dl_channel == channel: 1492 logger.info('The dl_channel was at {}'.format(self.dl_channel)) 1493 return 1494 self._cell.set_dl_ref_a_offset(self.band, NrArfcn(channel)) 1495 logger.info('The dl_channel was set to {}'.format(self.dl_channel)) 1496 1497 def set_dl_modulation_table(self, modulation): 1498 """Sets down link modulation table. 1499 1500 Args: 1501 modulation: modulation table setting (ModulationType). 1502 """ 1503 if not isinstance(modulation, ModulationType): 1504 raise CmxError('The modulation is not the type of Modulation') 1505 self._config_scheduler(dl_mcs_table=modulation.value) 1506 1507 def set_mimo_mode(self, mimo): 1508 """Sets mimo mode for NR nsa scenario. 1509 1510 Args: 1511 mimo: the mimo mode. 1512 """ 1513 from rs_mrt.testenvironment.signaling.sri.rat.nr import DownlinkMimoMode 1514 if not isinstance(mimo, MimoModes): 1515 raise CmxError("Wrong type of mimo mode") 1516 1517 self._config_scheduler(dl_mimo_mode=DownlinkMimoMode.Enum(mimo.value)) 1518 1519 def set_scheduling_mode(self, 1520 mcs_dl=None, 1521 mcs_ul=None, 1522 nrb_dl=None, 1523 nrb_ul=None): 1524 """Sets scheduling mode. 1525 1526 Args: 1527 mcs_dl: Downlink MCS. 1528 mcs_ul: Uplink MCS. 1529 nrb_dl: Number of RBs for downlink. 1530 nrb_ul: Number of RBs for uplink. 1531 """ 1532 self._config_scheduler(dl_mcs=mcs_dl, 1533 ul_mcs=mcs_ul, 1534 dl_rb_alloc=nrb_dl, 1535 ul_rb_alloc=nrb_ul) 1536 1537 def set_ssf_config(self, ssf_config): 1538 """Sets ssf subframe assignment with tdd_config. 1539 1540 Args: 1541 ssf_config: the special subframe pattern config (from 1-9). 1542 """ 1543 raise CmxError('the set ssf config for nr did not implemente yet') 1544 1545 def set_tdd_config(self, tdd_config): 1546 """Sets tdd subframe assignment with tdd_config. 1547 1548 Args: 1549 tdd_config: the subframe assignemnt config (from 0-6). 1550 """ 1551 raise CmxError('the set tdd config for nr did not implemente yet') 1552 1553 def set_transmission_mode(self, transmission_mode): 1554 """Sets transmission mode with schedular. 1555 1556 Args: 1557 transmission_mode: the download link transmission mode. 1558 """ 1559 logger.info('The set transmission mode for nr is set by mimo mode') 1560 1561 def set_ul_modulation_table(self, modulation): 1562 """Sets down link modulation table. 1563 1564 Args: 1565 modulation: modulation table setting (ModulationType). 1566 """ 1567 if not isinstance(modulation, ModulationType): 1568 raise CmxError('The modulation is not the type of Modulation') 1569 self._config_scheduler(ul_mcs_table=modulation.value) 1570 1571 def create_new_ta(self, tac): 1572 """Creates and initializes a new tracking area for this cell. 1573 1574 Args: 1575 tac: the tracking area ID of the new cell. 1576 Returns: 1577 ta: the new tracking area. 1578 """ 1579 plmn = self._network.get_or_create_plmn() 1580 return plmn.create_fivegs_ta(tac) 1581 1582 1583class CmxError(Exception): 1584 """Class to raise exceptions related to cmx.""" 1585