1#!/usr/bin/env python3 2# 3# Copyright (C) 2020 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18"""Tools for running host side simulation of an OTA update.""" 19 20 21import argparse 22import filecmp 23import os 24import shutil 25import subprocess 26import sys 27import tempfile 28import zipfile 29 30import update_payload 31 32 33def extract_file(zip_file_path, entry_name, target_file_path): 34 """Extract a file from zip archive into |target_file_path|""" 35 with open(target_file_path, 'wb') as out_fp: 36 if isinstance(zip_file_path, zipfile.ZipFile): 37 with zip_file_path.open(entry_name) as fp: 38 shutil.copyfileobj(fp, out_fp) 39 elif os.path.isdir(zip_file_path): 40 with open(os.path.join(zip_file_path, entry_name), "rb") as fp: 41 shutil.copyfileobj(fp, out_fp) 42 43 44def is_sparse_image(filepath): 45 with open(filepath, 'rb') as fp: 46 # Magic for android sparse image format 47 # https://source.android.com/devices/bootloader/images 48 return fp.read(4) == b'\x3A\xFF\x26\xED' 49 50 51def extract_img(zip_archive: zipfile.ZipFile, img_name, output_path, is_source): 52 """ Extract and unsparse partition image from zip archive """ 53 entry_name = "IMAGES/" + img_name + ".img" 54 try: 55 extract_file(zip_archive, entry_name, output_path) 56 except (KeyError, FileNotFoundError) as e: 57 print("Faild to extract", img_name, "from IMAGES/ dir, trying RADIO/", e) 58 extract_file(zip_archive, "RADIO/" + img_name + ".img", output_path) 59 if is_sparse_image(output_path): 60 raw_img_path = output_path + ".raw" 61 subprocess.check_output(["simg2img", output_path, raw_img_path]) 62 os.rename(raw_img_path, output_path) 63 64 # delta_generator only supports images multiple of 4 KiB. For target images 65 # we pad the data with zeros if needed, but for source images we truncate 66 # down the data since the last block of the old image could be padded on 67 # disk with unknown data. 68 file_size = os.path.getsize(output_path) 69 if file_size % 4096 != 0: 70 if is_source: 71 print("Rounding DOWN partition {} to a multiple of 4 KiB." 72 .format(output_path)) 73 file_size = file_size & -4096 74 else: 75 print("Rounding UP partition {} to a multiple of 4 KiB." 76 .format(output_path)) 77 file_size = (file_size + 4095) & -4096 78 with open(output_path, 'a') as f: 79 f.truncate(file_size) 80 81def run_ota(source, target, payload_path, tempdir, output_dir): 82 """Run an OTA on host side""" 83 payload = update_payload.Payload(payload_path) 84 payload.Init() 85 if source and zipfile.is_zipfile(source): 86 source = zipfile.ZipFile(source) 87 if target and zipfile.is_zipfile(target): 88 target = zipfile.ZipFile(target) 89 source_exist = source and (isinstance( 90 source, zipfile.ZipFile) or os.path.exists(source)) 91 target_exist = target and (isinstance( 92 target, zipfile.ZipFile) or os.path.exists(target)) 93 94 old_partitions = [] 95 new_partitions = [] 96 expected_new_partitions = [] 97 for part in payload.manifest.partitions: 98 name = part.partition_name 99 old_image = os.path.join(tempdir, "source_" + name + ".img") 100 new_image = os.path.join(tempdir, "target_" + name + ".img") 101 if part.HasField("old_partition_info"): 102 assert source_exist, \ 103 "source target file must point to a valid zipfile or directory " + \ 104 source 105 print("Extracting source image for", name) 106 extract_img(source, name, old_image, True) 107 if target_exist: 108 print("Extracting target image for", name) 109 extract_img(target, name, new_image, False) 110 111 old_partitions.append(old_image) 112 scratch_image_name = new_image + ".actual" 113 new_partitions.append(scratch_image_name) 114 with open(scratch_image_name, "wb") as fp: 115 fp.truncate(part.new_partition_info.size) 116 expected_new_partitions.append(new_image) 117 118 delta_generator_args = ["delta_generator", "--in_file=" + payload_path] 119 partition_names = [ 120 part.partition_name for part in payload.manifest.partitions 121 ] 122 if payload.manifest.partial_update: 123 delta_generator_args.append("--is_partial_update") 124 if payload.is_incremental: 125 delta_generator_args.append("--old_partitions=" + ":".join(old_partitions)) 126 delta_generator_args.append("--partition_names=" + ":".join(partition_names)) 127 delta_generator_args.append("--new_partitions=" + ":".join(new_partitions)) 128 129 print("Running ", " ".join(delta_generator_args)) 130 subprocess.check_output(delta_generator_args) 131 132 valid = True 133 if not target_exist: 134 for part in new_partitions: 135 print("Output written to", part) 136 shutil.copy(part, output_dir) 137 return 138 for (expected_part, actual_part, part_name) in \ 139 zip(expected_new_partitions, new_partitions, partition_names): 140 if filecmp.cmp(expected_part, actual_part): 141 print("Partition `{}` is valid".format(part_name)) 142 else: 143 valid = False 144 print( 145 "Partition `{}` is INVALID expected image: {} actual image: {}" 146 .format(part_name, expected_part, actual_part)) 147 148 if not valid and sys.stdout.isatty(): 149 input("Paused to investigate invalid partitions, press any key to exit.") 150 151 152def main(): 153 parser = argparse.ArgumentParser( 154 description="Run host side simulation of OTA package") 155 parser.add_argument( 156 "--source", 157 help="Target file zip for the source build", 158 required=False) 159 parser.add_argument( 160 "--target", 161 help="Target file zip for the target build", 162 required=False) 163 parser.add_argument( 164 "-o", 165 dest="output_dir", 166 help="Output directory to put all images, current directory by default" 167 ) 168 parser.add_argument( 169 "payload", 170 help="payload.bin for the OTA package, or a zip of OTA package itself", 171 nargs=1) 172 args = parser.parse_args() 173 print(args) 174 175 # pylint: disable=no-member 176 with tempfile.TemporaryDirectory() as tempdir: 177 payload_path = args.payload[0] 178 if zipfile.is_zipfile(payload_path): 179 with zipfile.ZipFile(payload_path, "r") as zfp: 180 payload_entry_name = 'payload.bin' 181 zfp.extract(payload_entry_name, tempdir) 182 payload_path = os.path.join(tempdir, payload_entry_name) 183 if args.output_dir is None: 184 args.output_dir = "." 185 if not os.path.exists(args.output_dir): 186 os.makedirs(args.output_dir, exist_ok=True) 187 assert os.path.isdir(args.output_dir) 188 run_ota(args.source, args.target, payload_path, tempdir, args.output_dir) 189 190 191if __name__ == '__main__': 192 main() 193