1#!/usr/bin/env python3
2#
3# Copyright (C) 2020 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""Tools for running host side simulation of an OTA update."""
19
20
21import argparse
22import filecmp
23import os
24import shutil
25import subprocess
26import sys
27import tempfile
28import zipfile
29
30import update_payload
31
32
33def extract_file(zip_file_path, entry_name, target_file_path):
34  """Extract a file from zip archive into |target_file_path|"""
35  with open(target_file_path, 'wb') as out_fp:
36    if isinstance(zip_file_path, zipfile.ZipFile):
37      with zip_file_path.open(entry_name) as fp:
38        shutil.copyfileobj(fp, out_fp)
39    elif os.path.isdir(zip_file_path):
40      with open(os.path.join(zip_file_path, entry_name), "rb") as fp:
41        shutil.copyfileobj(fp, out_fp)
42
43
44def is_sparse_image(filepath):
45  with open(filepath, 'rb') as fp:
46    # Magic for android sparse image format
47    # https://source.android.com/devices/bootloader/images
48    return fp.read(4) == b'\x3A\xFF\x26\xED'
49
50
51def extract_img(zip_archive: zipfile.ZipFile, img_name, output_path, is_source):
52  """ Extract and unsparse partition image from zip archive """
53  entry_name = "IMAGES/" + img_name + ".img"
54  try:
55    extract_file(zip_archive, entry_name, output_path)
56  except (KeyError, FileNotFoundError) as e:
57    print("Faild to extract", img_name, "from IMAGES/ dir, trying RADIO/", e)
58    extract_file(zip_archive, "RADIO/" + img_name + ".img", output_path)
59  if is_sparse_image(output_path):
60    raw_img_path = output_path + ".raw"
61    subprocess.check_output(["simg2img", output_path, raw_img_path])
62    os.rename(raw_img_path, output_path)
63
64  # delta_generator only supports images multiple of 4 KiB. For target images
65  # we pad the data with zeros if needed, but for source images we truncate
66  # down the data since the last block of the old image could be padded on
67  # disk with unknown data.
68  file_size = os.path.getsize(output_path)
69  if file_size % 4096 != 0:
70    if is_source:
71      print("Rounding DOWN partition {} to a multiple of 4 KiB."
72            .format(output_path))
73      file_size = file_size & -4096
74    else:
75      print("Rounding UP partition {} to a multiple of 4 KiB."
76            .format(output_path))
77      file_size = (file_size + 4095) & -4096
78    with open(output_path, 'a') as f:
79      f.truncate(file_size)
80
81def run_ota(source, target, payload_path, tempdir, output_dir):
82  """Run an OTA on host side"""
83  payload = update_payload.Payload(payload_path)
84  payload.Init()
85  if source and zipfile.is_zipfile(source):
86    source = zipfile.ZipFile(source)
87  if target and zipfile.is_zipfile(target):
88    target = zipfile.ZipFile(target)
89  source_exist = source and (isinstance(
90      source, zipfile.ZipFile) or os.path.exists(source))
91  target_exist = target and (isinstance(
92      target, zipfile.ZipFile) or os.path.exists(target))
93
94  old_partitions = []
95  new_partitions = []
96  expected_new_partitions = []
97  for part in payload.manifest.partitions:
98    name = part.partition_name
99    old_image = os.path.join(tempdir, "source_" + name + ".img")
100    new_image = os.path.join(tempdir, "target_" + name + ".img")
101    if part.HasField("old_partition_info"):
102      assert source_exist, \
103          "source target file must point to a valid zipfile or directory " + \
104          source
105      print("Extracting source image for", name)
106      extract_img(source, name, old_image, True)
107    if target_exist:
108      print("Extracting target image for", name)
109      extract_img(target, name, new_image, False)
110
111    old_partitions.append(old_image)
112    scratch_image_name = new_image + ".actual"
113    new_partitions.append(scratch_image_name)
114    with open(scratch_image_name, "wb") as fp:
115      fp.truncate(part.new_partition_info.size)
116    expected_new_partitions.append(new_image)
117
118  delta_generator_args = ["delta_generator", "--in_file=" + payload_path]
119  partition_names = [
120      part.partition_name for part in payload.manifest.partitions
121  ]
122  if payload.manifest.partial_update:
123    delta_generator_args.append("--is_partial_update")
124  if payload.is_incremental:
125    delta_generator_args.append("--old_partitions=" + ":".join(old_partitions))
126  delta_generator_args.append("--partition_names=" + ":".join(partition_names))
127  delta_generator_args.append("--new_partitions=" + ":".join(new_partitions))
128
129  print("Running ", " ".join(delta_generator_args))
130  subprocess.check_output(delta_generator_args)
131
132  valid = True
133  if not target_exist:
134    for part in new_partitions:
135      print("Output written to", part)
136      shutil.copy(part, output_dir)
137    return
138  for (expected_part, actual_part, part_name) in \
139          zip(expected_new_partitions, new_partitions, partition_names):
140    if filecmp.cmp(expected_part, actual_part):
141      print("Partition `{}` is valid".format(part_name))
142    else:
143      valid = False
144      print(
145          "Partition `{}` is INVALID expected image: {} actual image: {}"
146          .format(part_name, expected_part, actual_part))
147
148  if not valid and sys.stdout.isatty():
149    input("Paused to investigate invalid partitions, press any key to exit.")
150
151
152def main():
153  parser = argparse.ArgumentParser(
154      description="Run host side simulation of OTA package")
155  parser.add_argument(
156      "--source",
157      help="Target file zip for the source build",
158      required=False)
159  parser.add_argument(
160      "--target",
161      help="Target file zip for the target build",
162      required=False)
163  parser.add_argument(
164      "-o",
165      dest="output_dir",
166      help="Output directory to put all images, current directory by default"
167  )
168  parser.add_argument(
169      "payload",
170      help="payload.bin for the OTA package, or a zip of OTA package itself",
171      nargs=1)
172  args = parser.parse_args()
173  print(args)
174
175  # pylint: disable=no-member
176  with tempfile.TemporaryDirectory() as tempdir:
177    payload_path = args.payload[0]
178    if zipfile.is_zipfile(payload_path):
179      with zipfile.ZipFile(payload_path, "r") as zfp:
180        payload_entry_name = 'payload.bin'
181        zfp.extract(payload_entry_name, tempdir)
182        payload_path = os.path.join(tempdir, payload_entry_name)
183    if args.output_dir is None:
184      args.output_dir = "."
185    if not os.path.exists(args.output_dir):
186      os.makedirs(args.output_dir, exist_ok=True)
187    assert os.path.isdir(args.output_dir)
188    run_ota(args.source, args.target, payload_path, tempdir, args.output_dir)
189
190
191if __name__ == '__main__':
192  main()
193