1#!/usr/bin/env python3
2#
3#   Copyright 2022 - Google
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import datetime
18import enum
19import logging
20import time
21from typing import Callable
22
23from acts import asserts
24from acts.controllers.amarisoft_lib import amarisoft_client
25from acts.controllers.amarisoft_lib import config_utils
26from acts.controllers.amarisoft_lib import ssh_utils
27from acts.controllers.amarisoft_lib import ims
28from acts.controllers.amarisoft_lib import mme
29from acts.test_decorators import test_tracker_info
30from acts_contrib.test_utils.tel import tel_defines
31from acts_contrib.test_utils.tel.tel_phone_setup_utils import phone_setup_volte
32from acts.libs.proc import job
33from acts_contrib.test_utils.tel.TelephonyBaseTest import TelephonyBaseTest
34
35PWS_ALERT_4370 = 4370
36PWS_ALERT_4371 = 4371
37PWS_ALERT_4380 = 4380
38PWS_ALERT_911 = 911
39PWS_ALERT_4383 = 4383
40PWS_ALERT_4384 = 4384
41PWS_ALERT_4393 = 4393
42PWS_ALERT_919 = 919
43
44PREFERENCES_XML_FILENAME = '/data/user_de/0/com.google.android.cellbroadcastreceiver/shared_prefs/com.google.android.cellbroadcastreceiver_preferences.xml'
45ENABLE_TEST_ALERT_CMD = (
46    "sed -i 's/"
47    "enable_test_alerts\\\" value=\\\"false/"
48    "enable_test_alerts\\\" value=\\\"true/"
49    f"' {PREFERENCES_XML_FILENAME}")
50PWS_DUPLICATE_DETECTION_OFF = (
51    'am broadcast -a '
52    'com.android.cellbroadcastservice.action.DUPLICATE_DETECTION '
53    '--ez enable false')
54
55IN_CALL_DURATION = datetime.timedelta(seconds=10)
56CHECK_INTERVAL = datetime.timedelta(seconds=1)
57SERVICE_RESTART_TIME_OUT = datetime.timedelta(seconds=10)
58REGISTRATION_TIMEOUT = datetime.timedelta(seconds=120)
59WAIT_CALL_STATE_TIMEOUT = datetime.timedelta(seconds=30)
60PWS_START_END_INTERVAL = datetime.timedelta(seconds=15)
61
62
63class TestScenario(enum.Enum):
64  """Test scenario for PWS test."""
65  PS = 0
66  CS = 1
67  IDLE = 2
68
69
70class CallState(enum.Enum):
71  """Telephony call state."""
72  IDLE = 0
73  RINGING = 1
74  OFFHOOK = 2
75
76
77def wait_until(condition: Callable[..., bool], interval: datetime.timedelta,
78               timeout: datetime.timedelta, ret: bool, *argv) -> bool:
79  """Waits for the condition to occur.
80
81  Args:
82    condition: Function to check specific event occur or not.
83    interval: Time period during each check.
84    timeout: A timer which wait for event occur.
85    ret: Expected result of condition.
86    *argv: Parameters used by condition.
87
88  Returns:
89    True if condition match ret, False otherwise.
90  """
91  start_time = datetime.datetime.now()
92  while datetime.datetime.now() - start_time < timeout:
93    if condition(*argv) == ret:
94      return True
95    time.sleep(interval.total_seconds())
96  return False
97
98
99def is_in_service(ad) -> bool:
100  """Checks radio service state of android device .
101
102  Args:
103    ad: Mobly's Android controller objects.
104
105  Returns:
106    True if device is in service, False otherwise.
107  """
108  service_state = ad.droid.telephonyGetServiceState()
109  if service_state is None:
110    return False
111  return service_state.get('serviceState') == 'IN_SERVICE'
112
113
114class TelLabPwsTest(TelephonyBaseTest):
115
116  def setup_class(self):
117    super().setup_class()
118    self.ad = self.android_devices[0]
119    self.ad.info = self.user_params.get('AndroidDevice')[0]
120    self.amarisoft_ip_address = self.user_params.get('amarisoft_ip_address')
121    self.amarisoft_username = self.user_params.get('amarisoft_username')
122    self.amarisoft_pw = self.user_params.get('amarisoft_pw')
123    self.amarisoft_call_num = self.user_params.get('amarisoft_call_num')
124    self.remote = amarisoft_client.AmariSoftClient(self.amarisoft_ip_address,
125                                                   self.amarisoft_username,
126                                                   self.amarisoft_pw)
127    self.remote.connect()
128    self.config = config_utils.ConfigUtils(self.remote)
129    self.mme = mme.MmeFunctions(self.remote)
130    self.ims = ims.ImsFunctions(self.remote)
131    self._amarisoft_preset()
132    self._android_device_preset()
133
134  def _amarisoft_preset(self) -> None:
135    """Sets Amarisoft test network."""
136
137    if not self.remote.ssh_is_connected():
138      raise ssh_utils.NotConnectedError(
139          'amarisoft_preset: amarisoft is not connected.')
140    self.remote.lte_service_start()
141
142    asserts.skip_if(
143        not self.config.upload_enb_template(config_utils.EnbCfg.ENB_GENERIC),
144        'amarisoft_preset: Failed to upload enb configuration.')
145    asserts.skip_if(
146        not self.config.upload_mme_template(config_utils.MmeCfg.MME_GENERIC),
147        'amarisoft_preset: Failed to upload mme configuration.')
148    asserts.skip_if(
149        not self.config.enb_set_plmn('46697'),
150        'amarisoft_preset: Failed to set ENB PLMN.')
151    asserts.skip_if(
152        not self.config.mme_set_plmn('46697'),
153        'amarisoft_preset: Failed to set MME PLMN.')
154    asserts.skip_if(
155        not self.config.enb_set_spectrum_tech(config_utils.SpecTech.FDD.value),
156        'amarisoft_preset: Failed to set ENB spectrum technique.')
157    asserts.skip_if(
158        not self.config.enb_set_fdd_arfcn(275),
159        'amarisoft_preset: Failed to set ENB FDD ARFCN.')
160
161    self.remote.lte_service_restart()
162    start_time = datetime.datetime.now()
163    while not self.remote.lte_service_is_active():
164      if datetime.datetime.now() - start_time > SERVICE_RESTART_TIME_OUT:
165        asserts.fail('amarisoft_preset: Amarisoft service restart failed.')
166      else:
167        time.sleep(CHECK_INTERVAL)
168    self.log.info('Amarisoft preset completed.')
169
170  def _android_device_preset(self)->None:
171    """Presets the device before the test starts."""
172
173    self.log.info('Android device preset start.')
174    self.ad.droid.connectivityToggleAirplaneMode(False)
175    asserts.skip_if(
176        not wait_until(is_in_service, CHECK_INTERVAL, REGISTRATION_TIMEOUT,
177                       True, self.ad), 'android_device_preset: '
178        f'{self.ad.serial} is still out of service after airplane mode off.')
179    self.ad.droid.toggleRingerSilentMode(False)
180    self.ad.adb.shell(ENABLE_TEST_ALERT_CMD)
181    self.ad.reboot()
182    self.ad.droid.setMediaVolume(3)
183    self.ad.droid.setRingerVolume(3)
184    self.ad.droid.setVoiceCallVolume(3)
185    self.ad.droid.setAlarmVolume(3)
186    asserts.assert_true(
187        phone_setup_volte(self.log, self.ad),
188        'android_device_preset: Failed to set up VoLTE.')
189    self.log.info('Android device preset completed.')
190
191  def mo_call_to_amarisoft(self) -> None:
192    """Executes a MO call process including checking the call status during the MO call.
193
194      The method focus on if any issue found on MO side with below steps:
195      (1) Make a voice call from MO side to MT side(Amarisoft).
196      (2) MT side accepts the call.
197      (3) Check if the call is connect.
198      (4) Monitor the in-call status for MO side during in-call duration.
199      (5) End the call on MO side.
200    """
201    if not self.ad.droid.telephonyIsImsRegistered():
202      asserts.skip(
203          'mo_call_process: No IMS registered, cannot perform VoLTE call test.')
204    self.ad.log.info('Dial a Call to callbox.')
205    self.ad.droid.telecomCallNumber(self.amarisoft_call_num, False)
206    asserts.assert_true(
207        wait_until(self.ad.droid.telecomGetCallState, CHECK_INTERVAL,
208                   WAIT_CALL_STATE_TIMEOUT, CallState.OFFHOOK.name),
209        'mo_call_process: The call is not connected.')
210    asserts.assert_false(
211        wait_until(self.ad.droid.telecomIsInCall, CHECK_INTERVAL,
212                   IN_CALL_DURATION, False),
213        'mo_call_process: UE drop call before end call.')
214    self.ad.droid.telecomEndCall()
215    asserts.assert_true(
216        wait_until(self.ad.droid.telecomGetCallState, CHECK_INTERVAL,
217                   WAIT_CALL_STATE_TIMEOUT, CallState.IDLE.name),
218        'mo_call_process: UE is still in-call after hanging up the call.')
219
220  def pws_action(self, msg: str, test_scenario: int) -> None:
221    """Performs a PWS broadcast and check android device receives PWS message.
222
223    (1) Device idle or perform mo call/ping test according to test scenario.
224    (2) Broadcast a specific PWS message.
225    (3) Wait 15 seconds for device receive PWS message.
226    (4) Stop broadcast PWS message.
227    (5) Verify android device receive PWS message by check keywords in logcat.
228    (6) Perform mo call/ping test according to test scenario.
229
230    Args:
231      msg: The PWS parameter to be broadcast.
232      test_scenario: The parameters of the test scenario to be executed.
233    """
234    if test_scenario == TestScenario.PS:
235      job.run(f'adb -s {self.ad.serial} shell ping -c 5 8.8.8.8')
236    elif test_scenario == TestScenario.CS:
237      self.mo_call_to_amarisoft()
238
239    logging.info('Broadcast PWS: %s', msg)
240    # Advance the start time by one second to avoid loss of logs
241    # due to time differences between test device and mobileharness.
242    start_time = datetime.datetime.now() - datetime.timedelta(seconds=1)
243    self.mme.pws_write(msg)
244    time.sleep(PWS_START_END_INTERVAL.seconds)
245    self.mme.pws_kill(msg)
246
247    asserts.assert_true(
248        self.ad.search_logcat(
249            f'CBChannelManager: isEmergencyMessage: true, message id = {msg}',
250            start_time), f'{msg} not received.')
251    asserts.assert_false(
252        self.ad.search_logcat('Failed to play alert sound', start_time),
253        f'{msg} failed to play alert sound.')
254
255    if msg in [PWS_ALERT_911, PWS_ALERT_919]:
256      asserts.assert_true(
257          self.ad.search_logcat('playAlertTone: alertType=INFO', start_time),
258          f'{msg} alertType not match expected (alertType=INFO).')
259    else:
260      asserts.assert_true(
261          self.ad.search_logcat('playAlertTone: alertType=DEFAULT', start_time),
262          f'{msg} alertType not match expected (alertType=DEFAULT).')
263
264    if test_scenario == TestScenario.PS:
265      job.run(f'adb -s {self.ad.serial} shell ping -c 5 8.8.8.8')
266    elif test_scenario == TestScenario.CS:
267      self.mo_call_to_amarisoft()
268
269  def teardown_test(self):
270    self.ad.adb.shell(PWS_DUPLICATE_DETECTION_OFF)
271    super().teardown_test()
272
273  def teardown_class(self):
274    self.ad.droid.connectivityToggleAirplaneMode(True)
275    super().teardown_class()
276
277  @test_tracker_info(uuid="f8971b34-fcaa-4915-ba05-36c754378987")
278  def test_pws_idle_4370(self):
279    self.pws_action(PWS_ALERT_4370, TestScenario.IDLE)
280
281  @test_tracker_info(uuid="ed925410-646f-475a-8765-44ea1631cc6a")
282  def test_pws_idle_4371(self):
283    self.pws_action(PWS_ALERT_4371, TestScenario.IDLE)
284
285  @test_tracker_info(uuid="253f2e2e-8262-43b5-a66e-65b2bc73df58")
286  def test_pws_idle_4380(self):
287    self.pws_action(PWS_ALERT_4380, TestScenario.IDLE)
288
289  @test_tracker_info(uuid="95ed6407-3c5b-4f58-9fd9-e5021972f03c")
290  def test_pws_idle_911(self):
291    self.pws_action(PWS_ALERT_911, TestScenario.IDLE)
292
293  @test_tracker_info(uuid="a6f76e03-b808-4194-b286-54a2ca02cb7f")
294  def test_pws_idle_4383(self):
295    self.pws_action(PWS_ALERT_4383, TestScenario.IDLE)
296
297  @test_tracker_info(uuid="8db4be15-2e2c-4616-8f7f-a6b8062d7265")
298  def test_pws_idle_4384(self):
299    self.pws_action(PWS_ALERT_4384, TestScenario.IDLE)
300
301  @test_tracker_info(uuid="79ba63d7-8ffb-48d3-b27e-a8b152ee5a25")
302  def test_pws_idle_4393(self):
303    self.pws_action(PWS_ALERT_4393, TestScenario.IDLE)
304
305  @test_tracker_info(uuid="a07b1c14-dd3f-4818-bc8d-120d006dcea5")
306  def test_pws_idle_919(self):
307    self.pws_action(PWS_ALERT_919, TestScenario.IDLE)
308
309  @test_tracker_info(uuid="00b607a9-e75c-4342-9c7f-9528704ae3bd")
310  def test_pws_ps_4370(self):
311    self.pws_action(PWS_ALERT_4370, TestScenario.PS)
312
313  @test_tracker_info(uuid="feff8d7a-52fe-46f0-abe5-0da698fc985c")
314  def test_pws_ps_4371(self):
315    self.pws_action(PWS_ALERT_4371, TestScenario.PS)
316
317  @test_tracker_info(uuid="22afaaa1-7738-4499-a378-eabb9ae19fa6")
318  def test_pws_ps_4380(self):
319    self.pws_action(PWS_ALERT_4380, TestScenario.PS)
320
321  @test_tracker_info(uuid="d6fb35fa-9058-4c90-ac8d-bc49d6be1070")
322  def test_pws_ps_911(self):
323    self.pws_action(PWS_ALERT_911, TestScenario.PS)
324
325  @test_tracker_info(uuid="9937c39f-4b47-47f4-904a-108123919716")
326  def test_pws_ps_4383(self):
327    self.pws_action(PWS_ALERT_4383, TestScenario.PS)
328
329  @test_tracker_info(uuid="01faa5bb-e02a-42a3-bf08-30e422c684f4")
330  def test_pws_ps_4384(self):
331    self.pws_action(PWS_ALERT_4384, TestScenario.PS)
332
333  @test_tracker_info(uuid="71d02b4a-a1a3-44e1-a28a-aea3a62f758f")
334  def test_pws_ps_4393(self):
335    self.pws_action(PWS_ALERT_4393, TestScenario.PS)
336
337  @test_tracker_info(uuid="f5e7801c-80e0-4cbe-b4b1-133fa88fa4a3")
338  def test_pws_ps_919(self):
339    self.pws_action(PWS_ALERT_919, TestScenario.PS)
340
341  @test_tracker_info(uuid="b68e5593-1748-434c-be2a-e684791f2ca8")
342  def test_pws_cs_4370(self):
343    self.pws_action(PWS_ALERT_4370, TestScenario.CS)
344
345  @test_tracker_info(uuid="a04f433d-bbf0-4a09-b958-719ec8df9991")
346  def test_pws_cs_4371(self):
347    self.pws_action(PWS_ALERT_4371, TestScenario.CS)
348
349  @test_tracker_info(uuid="48432d8d-847a-44e3-aa24-32ae704e15de")
350  def test_pws_cs_4380(self):
351    self.pws_action(PWS_ALERT_4380, TestScenario.CS)
352
353  @test_tracker_info(uuid="9fde76b2-e568-4aa5-a627-9d682ba9e1fb")
354  def test_pws_cs_911(self):
355    self.pws_action(PWS_ALERT_911, TestScenario.CS)
356
357  @test_tracker_info(uuid="fa1f0c6a-22af-4daf-ab32-a508b06de165")
358  def test_pws_cs_4383(self):
359    self.pws_action(PWS_ALERT_4383, TestScenario.CS)
360
361  @test_tracker_info(uuid="45d924be-e204-497d-b598-e18a8c668492")
362  def test_pws_cs_4384(self):
363    self.pws_action(PWS_ALERT_4384, TestScenario.CS)
364
365  @test_tracker_info(uuid="ff4f0e6e-2bda-4047-a69c-7b103868e2d5")
366  def test_pws_cs_4393(self):
367    self.pws_action(PWS_ALERT_4393, TestScenario.CS)
368
369  @test_tracker_info(uuid="ab2bd166-c5e0-4505-ba37-6192bf53226f")
370  def test_pws_cs_919(self):
371    self.pws_action(PWS_ALERT_919, TestScenario.CS)
372
373
374