1#!/usr/bin/env python3 2# 3# Copyright 2021 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import time 18from enum import Enum 19 20from acts.controllers.rohdeschwarz_lib import cmx500 21from acts.controllers.rohdeschwarz_lib.cmx500 import LteBandwidth 22from acts.controllers.rohdeschwarz_lib.cmx500 import LteState 23from acts.controllers import cellular_simulator as cc 24from acts.controllers.cellular_lib import LteSimulation 25 26CMX_TM_MAPPING = { 27 LteSimulation.TransmissionMode.TM1: cmx500.TransmissionModes.TM1, 28 LteSimulation.TransmissionMode.TM2: cmx500.TransmissionModes.TM2, 29 LteSimulation.TransmissionMode.TM3: cmx500.TransmissionModes.TM3, 30 LteSimulation.TransmissionMode.TM4: cmx500.TransmissionModes.TM4, 31 LteSimulation.TransmissionMode.TM7: cmx500.TransmissionModes.TM7, 32 LteSimulation.TransmissionMode.TM8: cmx500.TransmissionModes.TM8, 33 LteSimulation.TransmissionMode.TM9: cmx500.TransmissionModes.TM9, 34} 35 36CMX_SCH_MAPPING = { 37 LteSimulation.SchedulingMode.STATIC: cmx500.SchedulingMode.USERDEFINEDCH 38} 39 40CMX_MIMO_MAPPING = { 41 LteSimulation.MimoMode.MIMO_1x1: cmx500.MimoModes.MIMO1x1, 42 LteSimulation.MimoMode.MIMO_2x2: cmx500.MimoModes.MIMO2x2, 43 LteSimulation.MimoMode.MIMO_4x4: cmx500.MimoModes.MIMO4x4, 44} 45 46 47class ConfigurationMode(Enum): 48 Power = "Power" 49 50 51class CMX500CellularSimulator(cc.AbstractCellularSimulator): 52 """ A cellular simulator for telephony simulations based on the CMX 500 53 controller. """ 54 55 # The maximum power that the equipment is able to transmit 56 MAX_DL_POWER = -25 57 58 def __init__(self, 59 ip_address, 60 port='5025', 61 config_mode=None): 62 """ Initializes the cellular simulator. 63 64 Args: 65 ip_address: the ip address of the CMX500 66 port: the port number for the CMX500 controller 67 config_mode: A pre-defined configuration mode to use. 68 """ 69 super().__init__() 70 try: 71 self.cmx = cmx500.Cmx500(ip_address, port) 72 except: 73 raise cc.CellularSimulatorError('Error when Initializes CMX500.') 74 75 self._config_mode = config_mode 76 self.bts = self.cmx.bts 77 78 def destroy(self): 79 """ Sends finalization commands to the cellular equipment and closes 80 the connection. """ 81 self.log.info('destroy the cmx500 simulator') 82 self.cmx.disconnect() 83 84 def set_config_mode(self, config_mode=None): 85 """Sets config mode for the cmx 500 simulator.""" 86 self._config_mode = config_mode 87 88 def configure_lte_bts(self, config, bts_index): 89 """ Commands the equipment to setup an LTE base station with the 90 required configuration except drx. 91 92 Args: 93 config: an LteSimulation.BtsConfig object. 94 bts_index: the base station number. 95 """ 96 self.configure_lte_bts_base(config, bts_index) 97 98 def configure_lte_bts_after_started(self, config, bts_index): 99 100 if config.drx_connected_mode: 101 self.set_cdrx_config(bts_index, config) 102 if config.disable_all_ul_subframes: 103 self.bts[bts_index].disable_all_ul_subframes() 104 self.log.info( 105 'The radio connectivity after lte started config is {}'.format( 106 self.cmx.dut.state.radio_connectivity)) 107 108 def configure_nr_bts_after_started(self, config, bts_index): 109 if config.drx_connected_mode: 110 self.set_cdrx_config(bts_index, config) 111 if config.config_flexible_slots: 112 self.bts[bts_index].config_flexible_slots() 113 if config.disable_all_ul_slots: 114 self.bts[bts_index].disable_all_ul_slots() 115 self.log.info( 116 'The radio connectivity after nr started config is {}'.format( 117 self.cmx.dut.state.radio_connectivity)) 118 119 def setup_lte_scenario(self): 120 """ Configures the equipment for an LTE simulation. """ 121 self.log.info('setup lte scenario') 122 self.cmx.switch_lte_signalling(cmx500.LteState.LTE_ON) 123 124 def setup_nr_sa_scenario(self): 125 """ Configures the equipment for an NR stand alone simulation. """ 126 raise NotImplementedError() 127 128 def setup_nr_nsa_scenario(self): 129 """ Configures the equipment for an NR non stand alone simulation. """ 130 self.log.info('setup nsa scenario (start lte cell and nr cell') 131 self.cmx.switch_on_nsa_signalling() 132 133 def set_band_combination(self, bands, mimo_modes): 134 """ Prepares the test equipment for the indicated band/mimo combination. 135 136 Args: 137 bands: a list of bands represented as ints or strings 138 mimo_modes: a list of LteSimulation.MimoMode to use for each carrier 139 """ 140 self.num_carriers = len(bands) 141 142 # Don't configure secondary cells if we're using the built-in power 143 # configuration. 144 if self._config_mode != ConfigurationMode.Power: 145 self.cmx.set_band_combination(bands) 146 147 def set_lte_rrc_state_change_timer(self, enabled, time=10): 148 """ Configures the LTE RRC state change timer. 149 150 Args: 151 enabled: a boolean indicating if the timer should be on or off. 152 time: time in seconds for the timer to expire 153 """ 154 self.log.info('set timer enabled to {} and the time to {}'.format( 155 enabled, time)) 156 self.cmx.rrc_state_change_time_enable = enabled 157 self.cmx.lte_rrc_state_change_timer = time 158 159 def set_band(self, bts_index, band): 160 """ Sets the band for the indicated base station. 161 162 Args: 163 bts_index: the base station number 164 band: the new band 165 """ 166 self.log.info('set band to {}'.format(band)) 167 self.bts[bts_index].set_band(int(band)) 168 169 def set_cdrx_config(self, bts_index, config): 170 """ Sets the tdd configuration number for the indicated base station. 171 172 Args: 173 bts_index: the base station number 174 config: the config including cdrx parameters 175 """ 176 self.log.info('set cdrx config for bts {} to {}'.format( 177 bts_index, config) 178 ) 179 self.bts[bts_index].set_cdrx_config(config) 180 181 def get_duplex_mode(self, band): 182 """ Determines if the band uses FDD or TDD duplex mode 183 184 Args: 185 band: a band number 186 187 Returns: 188 an variable of class DuplexMode indicating if band is FDD or TDD 189 """ 190 if 33 <= int(band) <= 46: 191 return cmx500.DuplexMode.TDD 192 else: 193 return cmx500.DuplexMode.FDD 194 195 def set_input_power(self, bts_index, input_power): 196 """ Sets the input power for the indicated base station. 197 198 Args: 199 bts_index: the base station number 200 input_power: the new input power 201 """ 202 if input_power > 23: 203 self.log.warning('Open loop supports -50dBm to 23 dBm. ' 204 'Setting it to max power 23 dBm') 205 input_power = 23 206 self.log.info('set input power to {}'.format(input_power)) 207 self.bts[bts_index].set_ul_power(input_power) 208 209 def set_output_power(self, bts_index, output_power): 210 """ Sets the output power for the indicated base station. 211 212 Args: 213 bts_index: the base station number 214 output_power: the new output power 215 """ 216 self.log.info('set output power to {}'.format(output_power)) 217 self.bts[bts_index].set_dl_power(output_power) 218 219 def set_tdd_config(self, bts_index, tdd_config): 220 """ Sets the tdd configuration number for the indicated base station. 221 222 Args: 223 bts_index: the base station number 224 tdd_config: the new tdd configuration number (from 0 to 6) 225 """ 226 self.log.info('set tdd config to {}'.format(tdd_config)) 227 self.bts[bts_index].set_tdd_config(tdd_config) 228 229 def set_ssf_config(self, bts_index, ssf_config): 230 """ Sets the Special Sub-Frame config number for the indicated 231 base station. 232 233 Args: 234 bts_index: the base station number 235 ssf_config: the new ssf config number (from 0 to 9) 236 """ 237 self.log.info('set ssf config to {}'.format(ssf_config)) 238 self.bts[bts_index].set_ssf_config(ssf_config) 239 240 def set_bandwidth(self, bts_index, bandwidth): 241 """ Sets the bandwidth for the indicated base station. 242 243 Args: 244 bts_index: the base station number 245 bandwidth: the new bandwidth in MHz 246 """ 247 self.log.info('set bandwidth of bts {} to {}'.format( 248 bts_index, bandwidth)) 249 self.bts[bts_index].set_bandwidth(int(bandwidth)) 250 251 def set_downlink_channel_number(self, bts_index, channel_number): 252 """ Sets the downlink channel number for the indicated base station. 253 254 Args: 255 bts_index: the base station number 256 channel_number: the new channel number (earfcn) 257 """ 258 self.log.info( 259 'Sets the downlink channel number to {}'.format(channel_number)) 260 self.bts[bts_index].set_dl_channel(channel_number) 261 262 def set_mimo_mode(self, bts_index, mimo_mode): 263 """ Sets the mimo mode for the indicated base station. 264 265 Args: 266 bts_index: the base station number 267 mimo_mode: the new mimo mode 268 """ 269 self.log.info('set mimo mode to {}'.format(mimo_mode)) 270 mimo_mode = CMX_MIMO_MAPPING[mimo_mode] 271 self.bts[bts_index].set_mimo_mode(mimo_mode) 272 273 def set_transmission_mode(self, bts_index, tmode): 274 """ Sets the transmission mode for the indicated base station. 275 276 Args: 277 bts_index: the base station number 278 tmode: the new transmission mode 279 """ 280 self.log.info('set TransmissionMode to {}'.format(tmode)) 281 tmode = CMX_TM_MAPPING[tmode] 282 self.bts[bts_index].set_transmission_mode(tmode) 283 284 def set_scheduling_mode(self, 285 bts_index, 286 scheduling, 287 mcs_dl=None, 288 mcs_ul=None, 289 nrb_dl=None, 290 nrb_ul=None): 291 """ Sets the scheduling mode for the indicated base station. 292 293 Args: 294 bts_index: the base station number. 295 scheduling: the new scheduling mode. 296 mcs_dl: Downlink MCS. 297 mcs_ul: Uplink MCS. 298 nrb_dl: Number of RBs for downlink. 299 nrb_ul: Number of RBs for uplink. 300 """ 301 if scheduling not in CMX_SCH_MAPPING: 302 raise cc.CellularSimulatorError( 303 "This scheduling mode is not supported") 304 log_list = [] 305 if mcs_dl: 306 log_list.append('mcs_dl: {}'.format(mcs_dl)) 307 if mcs_ul: 308 log_list.append('mcs_ul: {}'.format(mcs_ul)) 309 if nrb_dl: 310 log_list.append('nrb_dl: {}'.format(nrb_dl)) 311 if nrb_ul: 312 log_list.append('nrb_ul: {}'.format(nrb_ul)) 313 314 self.log.info('set scheduling mode to {}'.format(','.join(log_list))) 315 self.bts[bts_index].set_scheduling_mode(mcs_dl=mcs_dl, 316 mcs_ul=mcs_ul, 317 nrb_dl=nrb_dl, 318 nrb_ul=nrb_ul) 319 320 def set_dl_256_qam_enabled(self, bts_index, enabled): 321 """ Determines what MCS table should be used for the downlink. 322 323 Args: 324 bts_index: the base station number 325 enabled: whether 256 QAM should be used 326 """ 327 self.log.info('Set 256 QAM DL MCS enabled: ' + str(enabled)) 328 self.bts[bts_index].set_dl_modulation_table( 329 cmx500.ModulationType.Q256 if enabled else cmx500.ModulationType. 330 Q64) 331 332 def set_ul_64_qam_enabled(self, bts_index, enabled): 333 """ Determines what MCS table should be used for the uplink. 334 335 Args: 336 bts_index: the base station number 337 enabled: whether 64 QAM should be used 338 """ 339 self.log.info('Set 64 QAM UL MCS enabled: ' + str(enabled)) 340 self.bts[bts_index].set_ul_modulation_table( 341 cmx500.ModulationType.Q64 if enabled else cmx500.ModulationType.Q16 342 ) 343 344 def set_mac_padding(self, bts_index, mac_padding): 345 """ Enables or disables MAC padding in the indicated base station. 346 347 Args: 348 bts_index: the base station number 349 mac_padding: the new MAC padding setting 350 """ 351 self.log.info('set mac pad on {}'.format(mac_padding)) 352 self.bts[bts_index].set_dl_mac_padding(mac_padding) 353 354 def set_cfi(self, bts_index, cfi): 355 """ Sets the Channel Format Indicator for the indicated base station. 356 357 Args: 358 bts_index: the base station number 359 cfi: the new CFI setting 360 """ 361 if cfi == 'BESTEFFORT': 362 self.log.info('The cfi is BESTEFFORT, use default value') 363 return 364 try: 365 index = int(cfi) + 1 366 except Exception as e: 367 index = 1 368 finally: 369 self.log.info('set the cfi and the cfi index is {}'.format(index)) 370 self.bts[bts_index].set_cfi(index) 371 372 def set_paging_cycle(self, bts_index, cycle_duration): 373 """ Sets the paging cycle duration for the indicated base station. 374 375 Args: 376 bts_index: the base station number 377 cycle_duration: the new paging cycle duration in milliseconds 378 """ 379 self.log.warning('The set_paging_cycle method is not implememted, ' 380 'use default value') 381 382 def set_phich_resource(self, bts_index, phich): 383 """ Sets the PHICH Resource setting for the indicated base station. 384 385 Args: 386 bts_index: the base station number 387 phich: the new PHICH resource setting 388 """ 389 self.log.warning('The set_phich_resource method is not implememted, ' 390 'use default value') 391 392 def set_tracking_area(self, bts_index, tac): 393 """ Assigns the cell to a specific tracking area. 394 395 Args: 396 tac: the unique tac to assign the cell to. 397 """ 398 self.bts[bts_index].set_tracking_area(tac) 399 400 def lte_attach_secondary_carriers(self, ue_capability_enquiry): 401 """ Activates the secondary carriers for CA. Requires the DUT to be 402 attached to the primary carrier first. 403 404 Args: 405 ue_capability_enquiry: UE capability enquiry message to be sent to 406 the UE before starting carrier aggregation. 407 """ 408 self.wait_until_communication_state() 409 410 # primary cell is attached, now turn on all secondary cells 411 if self.cmx.secondary_cells: 412 self.cmx.turn_on_secondary_cells() 413 414 # if a primary lte and primary nr cell exist, then activate endc on 415 # primary nr cell 416 is_nsa = self.cmx.primary_lte_cell and self.cmx.primary_nr_cell 417 418 if is_nsa: 419 self.cmx.primary_nr_cell.activate_endc() 420 421 # attach secondary lte and nr cells 422 # if nsa then nr cells should be added to secondary cell group 423 for bts in self.cmx.secondary_lte_cells: 424 bts.attach_as_secondary_cell() 425 426 for bts in self.cmx.secondary_nr_cells: 427 bts.attach_as_secondary_cell(scg=is_nsa) 428 429 if self._config_mode and self._config_mode == ConfigurationMode.Power: 430 self.configure_for_power_measurement() 431 432 self.log.info('The radio connectivity is {}'.format( 433 self.cmx.dut.state.radio_connectivity)) 434 435 def configure_for_power_measurement(self): 436 """ Applies a pre-defined configuration for PDCCH power testing.""" 437 self.log.info('set lte cdrx for nr nsa scenario') 438 for bts in self.cmx.lte_cells: 439 bts.set_default_cdrx_config() 440 time.sleep(5) 441 442 self.log.info('Disables mac padding') 443 for bts in self.bts: 444 bts.set_dl_mac_padding(False) 445 time.sleep(5) 446 447 self.log.info('configure flexible slots and wait for 5 seconds') 448 for bts in self.cmx.nr_cells: 449 bts.config_flexible_slots() 450 time.sleep(5) 451 452 self.log.info('disable all ul subframes of the lte cell') 453 for bts in self.cmx.lte_cells: 454 bts.disable_all_ul_subframes() 455 time.sleep(30) 456 457 self.log.info('Disables Nr UL slots') 458 for bts in self.cmx.nr_cells: 459 bts.disable_all_ul_slots() 460 time.sleep(5) 461 462 def wait_until_attached(self, timeout=120): 463 """ Waits until the DUT is attached to the primary carrier. 464 465 Args: 466 timeout: after this amount of time the method will raise a 467 CellularSimulatorError exception. Default is 120 seconds. 468 """ 469 self.log.info('wait until attached') 470 if len(self.cmx.tracking_areas) > 1: 471 self.log.info('turning off neighbor cells') 472 self.cmx.turn_on_primary_cells() 473 self.cmx.turn_off_neighbor_cells() 474 475 self.cmx.wait_until_attached(timeout) 476 477 def wait_until_communication_state(self, timeout=120): 478 """ Waits until the DUT is in Communication state. 479 480 Args: 481 timeout: after this amount of time the method will raise a 482 CellularSimulatorError exception. Default is 120 seconds. 483 Return: 484 True if cmx reach rrc state within timeout 485 Raise: 486 CmxError if tiemout 487 """ 488 self.log.info('wait for rrc on state') 489 return self.cmx.wait_for_rrc_state(cmx500.RrcState.RRC_ON, timeout) 490 491 def wait_until_idle_state(self, timeout=120): 492 """ Waits until the DUT is in Idle state. 493 494 Args: 495 timeout: after this amount of time the method will raise a 496 CellularSimulatorError exception. Default is 120 seconds. 497 Return: 498 True if cmx reach rrc state within timeout 499 Raise: 500 CmxError if tiemout 501 """ 502 self.log.info('wait for rrc off state') 503 return self.cmx.wait_for_rrc_state(cmx500.RrcState.RRC_OFF, timeout) 504 505 def wait_until_quiet(self, timeout=120): 506 """Waits for all pending operations to finish on the simulator. 507 508 Args: 509 timeout: after this amount of time the method will raise a 510 CellularSimulatorError exception. Default is 120 seconds. 511 """ 512 self.cmx.network_apply_changes() 513 514 def detach(self): 515 """ Turns off all the base stations so the DUT loose connection.""" 516 self.log.info('Bypass simulator detach step for now') 517 518 def stop(self): 519 """ Stops current simulation. After calling this method, the simulator 520 will need to be set up again. """ 521 self.log.info('Stops current simulation and disconnect cmx500') 522 self.cmx.disconnect() 523 524 def start_data_traffic(self): 525 """ Starts transmitting data from the instrument to the DUT. """ 526 self.log.warning('The start_data_traffic is not implemented yet') 527 528 def stop_data_traffic(self): 529 """ Stops transmitting data from the instrument to the DUT. """ 530 self.log.warning('The stop_data_traffic is not implemented yet') 531 532 def send_sms(self, message): 533 """ Sends an SMS message to the DUT. 534 535 Args: 536 message: the SMS message to send. 537 """ 538 self.cmx.send_sms(message) 539