1#!/usr/bin/env python3
2#
3# Copyright (C) 2021 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""Repeatedly install an A/B update to an Android device over adb."""
19
20import argparse
21import sys
22from pathlib import Path
23import subprocess
24import signal
25
26
27def CleanupLoopDevices():
28  # b/184716804 clean up unused loop devices
29  subprocess.check_call(["adb", "shell", "su", "0", "losetup", '-D'])
30
31
32def CancelOTA():
33  subprocess.call(["adb", "shell", "su", "0",
34                  "update_engine_client", "--cancel"])
35
36
37def PerformOTAThenPause(otafile: Path, update_device_script: Path):
38  python = sys.executable
39  ota_cmd = [python, str(update_device_script), str(otafile),
40             "--no-postinstall", "--no-slot-switch"]
41  p = subprocess.Popen(ota_cmd)
42  pid = p.pid
43  try:
44    ret = p.wait(10)
45    if ret is not None and ret != 0:
46      raise RuntimeError("OTA failed to apply")
47    if ret == 0:
48      print("OTA finished early? Surprise.")
49      return
50  except subprocess.TimeoutExpired:
51    pass
52  print(f"Killing {pid}")
53  subprocess.check_call(["pkill", "-INT", "-P", str(pid)])
54  p.send_signal(signal.SIGINT)
55  p.wait()
56
57
58def PerformTest(otafile: Path, resumes: int, timeout: int):
59  """Install an OTA to device, raising exceptions on failure
60
61  Args:
62    otafile: Path to the ota.zip to install
63
64  Return:
65    None if no error, if there's an error exception will be thrown
66  """
67  assert otafile.exists()
68  print("Applying", otafile)
69  script_dir = Path(__file__).parent.absolute()
70  update_device_script = script_dir / "update_device.py"
71  assert update_device_script.exists()
72  print(update_device_script)
73  python = sys.executable
74
75  for i in range(resumes):
76    print("Pause/Resume for the", i+1, "th time")
77    PerformOTAThenPause(otafile, update_device_script)
78  CancelOTA()
79  CleanupLoopDevices()
80
81  ota_cmd = [python, str(update_device_script),
82             str(otafile), "--no-postinstall"]
83  print("Finishing OTA Update", ota_cmd)
84  output = subprocess.check_output(
85      ota_cmd, stderr=subprocess.STDOUT, timeout=timeout).decode()
86  print(output)
87  if "onPayloadApplicationComplete(ErrorCode::kSuccess" not in output:
88    raise RuntimeError("Failed to finish OTA")
89  subprocess.call(
90      ["adb", "shell", "su", "0", "update_engine_client", "--cancel"])
91  subprocess.check_call(
92      ["adb", "shell", "su", "0", "update_engine_client", "--reset_status"])
93  CleanupLoopDevices()
94
95
96def main():
97  parser = argparse.ArgumentParser(
98      description='Android A/B OTA stress test helper.')
99  parser.add_argument('otafile', metavar='PAYLOAD', type=Path,
100                      help='the OTA package file (a .zip file) or raw payload \
101                      if device uses Omaha.')
102  parser.add_argument('-n', "--iterations", type=int, default=10,
103                      metavar='ITERATIONS',
104                      help='The number of iterations to run the stress test, or\
105                       -1 to keep running until CTRL+C')
106  parser.add_argument('-r', "--resumes", type=int, default=5, metavar='RESUMES',
107                      help='The number of iterations to pause the update when \
108                        installing')
109  parser.add_argument('-t', "--timeout", type=int, default=60*60,
110                      metavar='TIMEOUTS',
111                      help='Timeout, in seconds, when waiting for OTA to \
112                        finish')
113  args = parser.parse_args()
114  print(args)
115  n = args.iterations
116  while n != 0:
117    PerformTest(args.otafile, args.resumes, args.timeout)
118    n -= 1
119
120
121if __name__ == "__main__":
122  main()
123