# Copyright (C) 2022 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import logging import struct import sys import update_payload import tempfile import zipfile import os import care_map_pb2 import common from typing import BinaryIO, List from update_metadata_pb2 import DeltaArchiveManifest, DynamicPartitionMetadata, DynamicPartitionGroup from ota_metadata_pb2 import OtaMetadata from update_payload import Payload from payload_signer import PayloadSigner from ota_utils import PayloadGenerator, METADATA_PROTO_NAME, FinalizeMetadata from ota_signing_utils import AddSigningArgumentParse logger = logging.getLogger(__name__) CARE_MAP_ENTRY = "care_map.pb" APEX_INFO_ENTRY = "apex_info.pb" def WriteDataBlob(payload: Payload, outfp: BinaryIO, read_size=1024*64): for i in range(0, payload.total_data_length, read_size): blob = payload.ReadDataBlob( i, min(i+read_size, payload.total_data_length)-i) outfp.write(blob) def ConcatBlobs(payloads: List[Payload], outfp: BinaryIO): for payload in payloads: WriteDataBlob(payload, outfp) def TotalDataLength(partitions): for partition in reversed(partitions): for op in reversed(partition.operations): if op.data_length > 0: return op.data_offset + op.data_length return 0 def ExtendPartitionUpdates(partitions, new_partitions): prefix_blob_length = TotalDataLength(partitions) partitions.extend(new_partitions) for part in partitions[-len(new_partitions):]: for op in part.operations: if op.HasField("data_length") and op.data_length != 0: op.data_offset += prefix_blob_length class DuplicatePartitionError(ValueError): pass def MergeDynamicPartitionGroups(groups: List[DynamicPartitionGroup], new_groups: List[DynamicPartitionGroup]): new_groups = {new_group.name: new_group for new_group in new_groups} for group in groups: if group.name not in new_groups: continue new_group = new_groups[group.name] common_partitions = set(group.partition_names).intersection( set(new_group.partition_names)) if len(common_partitions) != 0: raise DuplicatePartitionError( f"Old group and new group should not have any intersections, {group.partition_names}, {new_group.partition_names}, common partitions: {common_partitions}") group.partition_names.extend(new_group.partition_names) group.size = max(new_group.size, group.size) del new_groups[group.name] for new_group in new_groups.values(): groups.append(new_group) def MergeDynamicPartitionMetadata(metadata: DynamicPartitionMetadata, new_metadata: DynamicPartitionMetadata): MergeDynamicPartitionGroups(metadata.groups, new_metadata.groups) metadata.snapshot_enabled &= new_metadata.snapshot_enabled metadata.vabc_enabled &= new_metadata.vabc_enabled assert metadata.vabc_compression_param == new_metadata.vabc_compression_param, f"{metadata.vabc_compression_param} vs. {new_metadata.vabc_compression_param}" metadata.cow_version = max(metadata.cow_version, new_metadata.cow_version) def MergeManifests(payloads: List[Payload]) -> DeltaArchiveManifest: if len(payloads) == 0: return None if len(payloads) == 1: return payloads[0].manifest output_manifest = DeltaArchiveManifest() output_manifest.block_size = payloads[0].manifest.block_size output_manifest.partial_update = True output_manifest.dynamic_partition_metadata.snapshot_enabled = payloads[ 0].manifest.dynamic_partition_metadata.snapshot_enabled output_manifest.dynamic_partition_metadata.vabc_enabled = payloads[ 0].manifest.dynamic_partition_metadata.vabc_enabled output_manifest.dynamic_partition_metadata.vabc_compression_param = payloads[ 0].manifest.dynamic_partition_metadata.vabc_compression_param apex_info = {} for payload in payloads: manifest = payload.manifest assert manifest.block_size == output_manifest.block_size output_manifest.minor_version = max( output_manifest.minor_version, manifest.minor_version) output_manifest.max_timestamp = max( output_manifest.max_timestamp, manifest.max_timestamp) output_manifest.apex_info.extend(manifest.apex_info) for apex in manifest.apex_info: apex_info[apex.package_name] = apex ExtendPartitionUpdates(output_manifest.partitions, manifest.partitions) try: MergeDynamicPartitionMetadata( output_manifest.dynamic_partition_metadata, manifest.dynamic_partition_metadata) except DuplicatePartitionError: logger.error( "OTA %s has duplicate partition with some of the previous OTAs", payload.name) raise for apex_name in sorted(apex_info.keys()): output_manifest.apex_info.extend(apex_info[apex_name]) return output_manifest def MergePayloads(payloads: List[Payload]): with tempfile.NamedTemporaryFile(prefix="payload_blob") as tmpfile: ConcatBlobs(payloads, tmpfile) def MergeCareMap(paths: List[str]): care_map = care_map_pb2.CareMap() for path in paths: with zipfile.ZipFile(path, "r", allowZip64=True) as zfp: if CARE_MAP_ENTRY in zfp.namelist(): care_map_bytes = zfp.read(CARE_MAP_ENTRY) partial_care_map = care_map_pb2.CareMap() partial_care_map.ParseFromString(care_map_bytes) care_map.partitions.extend(partial_care_map.partitions) if len(care_map.partitions) == 0: return b"" return care_map.SerializeToString() def WriteHeaderAndManifest(manifest: DeltaArchiveManifest, fp: BinaryIO): __MAGIC = b"CrAU" __MAJOR_VERSION = 2 manifest_bytes = manifest.SerializeToString() fp.write(struct.pack(f">4sQQL", __MAGIC, __MAJOR_VERSION, len(manifest_bytes), 0)) fp.write(manifest_bytes) def AddOtaMetadata(input_ota, metadata_ota, output_ota, package_key, pw): with zipfile.ZipFile(metadata_ota, 'r') as zfp: metadata = OtaMetadata() metadata.ParseFromString(zfp.read(METADATA_PROTO_NAME)) FinalizeMetadata(metadata, input_ota, output_ota, package_key=package_key, pw=pw) return output_ota def CheckOutput(output_ota): payload = update_payload.Payload(output_ota) payload.CheckOpDataHash() def CheckDuplicatePartitions(payloads: List[Payload]): partition_to_ota = {} for payload in payloads: for group in payload.manifest.dynamic_partition_metadata.groups: for part in group.partition_names: if part in partition_to_ota: raise DuplicatePartitionError( f"OTA {partition_to_ota[part].name} and {payload.name} have duplicating partition {part}") partition_to_ota[part] = payload def ApexInfo(file_paths): if len(file_paths) > 1: logger.info("More than one target file specified, will ignore " "apex_info.pb (if any)") return None with zipfile.ZipFile(file_paths[0], "r", allowZip64=True) as zfp: if APEX_INFO_ENTRY in zfp.namelist(): apex_info_bytes = zfp.read(APEX_INFO_ENTRY) return apex_info_bytes return None def main(argv): parser = argparse.ArgumentParser(description='Merge multiple partial OTAs') parser.add_argument('packages', type=str, nargs='+', help='Paths to OTA packages to merge') parser.add_argument('--output', type=str, help='Paths to output merged ota', required=True) parser.add_argument('--metadata_ota', type=str, help='Output zip will use build metadata from this OTA package, if unspecified, use the last OTA package in merge list') parser.add_argument('-v', action="store_true", help="Enable verbose logging", dest="verbose") AddSigningArgumentParse(parser) parser.epilog = ('This tool can also be used to resign a regular OTA. For a single regular OTA, ' 'apex_info.pb will be written to output. When merging multiple OTAs, ' 'apex_info.pb will not be written.') args = parser.parse_args(argv[1:]) file_paths = args.packages common.OPTIONS.verbose = args.verbose if args.verbose: logger.setLevel(logging.INFO) logger.info(args) if args.search_path: common.OPTIONS.search_path = args.search_path metadata_ota = args.packages[-1] if args.metadata_ota is not None: metadata_ota = args.metadata_ota assert os.path.exists(metadata_ota) payloads = [Payload(path) for path in file_paths] CheckDuplicatePartitions(payloads) merged_manifest = MergeManifests(payloads) # Get signing keys key_passwords = common.GetKeyPasswords([args.package_key]) apex_info_bytes = ApexInfo(file_paths) with tempfile.NamedTemporaryFile() as unsigned_payload: WriteHeaderAndManifest(merged_manifest, unsigned_payload) ConcatBlobs(payloads, unsigned_payload) unsigned_payload.flush() generator = PayloadGenerator() generator.payload_file = unsigned_payload.name logger.info("Payload size: %d", os.path.getsize(generator.payload_file)) if args.package_key: logger.info("Signing payload...") # TODO: remove OPTIONS when no longer used as fallback in payload_signer common.OPTIONS.payload_signer_args = None common.OPTIONS.payload_signer_maximum_signature_size = None signer = PayloadSigner(args.package_key, args.private_key_suffix, key_passwords[args.package_key], payload_signer=args.payload_signer, payload_signer_args=args.payload_signer_args, payload_signer_maximum_signature_size=args.payload_signer_maximum_signature_size) generator.payload_file = unsigned_payload.name generator.Sign(signer) logger.info("Payload size: %d", os.path.getsize(generator.payload_file)) logger.info("Writing to %s", args.output) key_passwords = common.GetKeyPasswords([args.package_key]) with tempfile.NamedTemporaryFile(prefix="signed_ota", suffix=".zip") as signed_ota: with zipfile.ZipFile(signed_ota, "w") as zfp: generator.WriteToZip(zfp) care_map_bytes = MergeCareMap(args.packages) if care_map_bytes: common.ZipWriteStr(zfp, CARE_MAP_ENTRY, care_map_bytes) if apex_info_bytes: logger.info("Writing %s", APEX_INFO_ENTRY) common.ZipWriteStr(zfp, APEX_INFO_ENTRY, apex_info_bytes) AddOtaMetadata(signed_ota.name, metadata_ota, args.output, args.package_key, key_passwords[args.package_key]) return 0 if __name__ == '__main__': logging.basicConfig() sys.exit(main(sys.argv))