1#!/usr/bin/env python3 2# 3# Copyright (C) 2022 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import common 18import logging 19import shlex 20import argparse 21import tempfile 22import zipfile 23import shutil 24from common import OPTIONS, OptionHandler 25from ota_signing_utils import AddSigningArgumentParse 26 27logger = logging.getLogger(__name__) 28 29OPTIONS.payload_signer = None 30OPTIONS.payload_signer_args = [] 31OPTIONS.payload_signer_maximum_signature_size = None 32OPTIONS.package_key = None 33 34PAYLOAD_BIN = 'payload.bin' 35PAYLOAD_PROPERTIES_TXT = 'payload_properties.txt' 36 37class SignerOptions(OptionHandler): 38 39 @staticmethod 40 def ParseOptions(o, a): 41 if o in ("-k", "--package_key"): 42 OPTIONS.package_key = a 43 elif o == "--payload_signer": 44 OPTIONS.payload_signer = a 45 elif o == "--payload_signer_args": 46 OPTIONS.payload_signer_args = shlex.split(a) 47 elif o == "--payload_signer_maximum_signature_size": 48 OPTIONS.payload_signer_maximum_signature_size = a 49 elif o == "--payload_signer_key_size": 50 # TODO(xunchang) remove this option after cleaning up the callers. 51 logger.warning("The option '--payload_signer_key_size' is deprecated." 52 " Use '--payload_signer_maximum_signature_size' instead.") 53 OPTIONS.payload_signer_maximum_signature_size = a 54 else: 55 return False 56 return True 57 58 def __init__(self): 59 super().__init__( 60 ["payload_signer=", 61 "package_key=", 62 "payload_signer_args=", 63 "payload_signer_maximum_signature_size=", 64 "payload_signer_key_size="], 65 SignerOptions.ParseOptions 66 ) 67 68 69signer_options = SignerOptions() 70 71 72class PayloadSigner(object): 73 """A class that wraps the payload signing works. 74 75 When generating a Payload, hashes of the payload and metadata files will be 76 signed with the device key, either by calling an external payload signer or 77 by calling openssl with the package key. This class provides a unified 78 interface, so that callers can just call PayloadSigner.Sign(). 79 80 If an external payload signer has been specified (OPTIONS.payload_signer), it 81 calls the signer with the provided args (OPTIONS.payload_signer_args). Note 82 that the signing key should be provided as part of the payload_signer_args. 83 Otherwise without an external signer, it uses the package key 84 (OPTIONS.package_key) and calls openssl for the signing works. 85 """ 86 87 def __init__(self, package_key=None, private_key_suffix=None, pw=None, payload_signer=None, 88 payload_signer_args=None, payload_signer_maximum_signature_size=None): 89 if package_key is None: 90 package_key = OPTIONS.package_key 91 if private_key_suffix is None: 92 private_key_suffix = OPTIONS.private_key_suffix 93 if payload_signer_args is None: 94 payload_signer_args = OPTIONS.payload_signer_args 95 if payload_signer_maximum_signature_size is None: 96 payload_signer_maximum_signature_size = OPTIONS.payload_signer_maximum_signature_size 97 98 if payload_signer is None: 99 # Prepare the payload signing key. 100 private_key = package_key + private_key_suffix 101 102 cmd = ["openssl", "pkcs8", "-in", private_key, "-inform", "DER"] 103 cmd.extend(["-passin", "pass:" + pw] if pw else ["-nocrypt"]) 104 signing_key = common.MakeTempFile(prefix="key-", suffix=".key") 105 cmd.extend(["-out", signing_key]) 106 common.RunAndCheckOutput(cmd, verbose=True) 107 108 self.signer = "openssl" 109 self.signer_args = ["pkeyutl", "-sign", "-inkey", signing_key, 110 "-pkeyopt", "digest:sha256"] 111 self.maximum_signature_size = self._GetMaximumSignatureSizeInBytes( 112 signing_key) 113 else: 114 self.signer = payload_signer 115 self.signer_args = payload_signer_args 116 if payload_signer_maximum_signature_size: 117 self.maximum_signature_size = int( 118 payload_signer_maximum_signature_size) 119 else: 120 # The legacy config uses RSA2048 keys. 121 logger.warning("The maximum signature size for payload signer is not" 122 " set, default to 256 bytes.") 123 self.maximum_signature_size = 256 124 125 @staticmethod 126 def _GetMaximumSignatureSizeInBytes(signing_key): 127 out_signature_size_file = common.MakeTempFile("signature_size") 128 cmd = ["delta_generator", "--out_maximum_signature_size_file={}".format( 129 out_signature_size_file), "--private_key={}".format(signing_key)] 130 common.RunAndCheckOutput(cmd, verbose=True) 131 with open(out_signature_size_file) as f: 132 signature_size = f.read().rstrip() 133 logger.info("%s outputs the maximum signature size: %s", cmd[0], 134 signature_size) 135 return int(signature_size) 136 137 @staticmethod 138 def _Run(cmd): 139 common.RunAndCheckOutput(cmd, stdout=None, stderr=None) 140 141 def SignPayload(self, unsigned_payload): 142 143 # 1. Generate hashes of the payload and metadata files. 144 payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin") 145 metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin") 146 cmd = ["delta_generator", 147 "--in_file=" + unsigned_payload, 148 "--signature_size=" + str(self.maximum_signature_size), 149 "--out_metadata_hash_file=" + metadata_sig_file, 150 "--out_hash_file=" + payload_sig_file] 151 self._Run(cmd) 152 153 # 2. Sign the hashes. 154 signed_payload_sig_file = self.SignHashFile(payload_sig_file) 155 signed_metadata_sig_file = self.SignHashFile(metadata_sig_file) 156 157 # 3. Insert the signatures back into the payload file. 158 signed_payload_file = common.MakeTempFile(prefix="signed-payload-", 159 suffix=".bin") 160 cmd = ["delta_generator", 161 "--in_file=" + unsigned_payload, 162 "--out_file=" + signed_payload_file, 163 "--signature_size=" + str(self.maximum_signature_size), 164 "--metadata_signature_file=" + signed_metadata_sig_file, 165 "--payload_signature_file=" + signed_payload_sig_file] 166 self._Run(cmd) 167 return signed_payload_file 168 169 def SignHashFile(self, in_file): 170 """Signs the given input file. Returns the output filename.""" 171 out_file = common.MakeTempFile(prefix="signed-", suffix=".bin") 172 cmd = [self.signer] + self.signer_args + ['-in', in_file, '-out', out_file] 173 common.RunAndCheckOutput(cmd) 174 return out_file 175 176def GeneratePayloadProperties(payload_file): 177 properties_file = common.MakeTempFile(prefix="payload-properties-", 178 suffix=".txt") 179 cmd = ["delta_generator", 180 "--in_file=" + payload_file, 181 "--properties_file=" + properties_file] 182 common.RunAndCheckOutput(cmd) 183 return properties_file 184 185def SignOtaPackage(input_path, output_path): 186 payload_signer = PayloadSigner( 187 OPTIONS.package_key, OPTIONS.private_key_suffix, 188 None, OPTIONS.payload_signer, OPTIONS.payload_signer_args) 189 common.ZipExclude(input_path, output_path, [PAYLOAD_BIN, PAYLOAD_PROPERTIES_TXT]) 190 with tempfile.NamedTemporaryFile() as unsigned_payload, zipfile.ZipFile(input_path, "r", allowZip64=True) as zfp: 191 with zfp.open("payload.bin") as payload_fp: 192 shutil.copyfileobj(payload_fp, unsigned_payload) 193 signed_payload = payload_signer.SignPayload(unsigned_payload.name) 194 properties_file = GeneratePayloadProperties(signed_payload) 195 with zipfile.ZipFile(output_path, "a", compression=zipfile.ZIP_STORED, allowZip64=True) as output_zfp: 196 common.ZipWrite(output_zfp, signed_payload, PAYLOAD_BIN) 197 common.ZipWrite(output_zfp, properties_file, PAYLOAD_PROPERTIES_TXT) 198 199 200def main(argv): 201 parser = argparse.ArgumentParser( 202 prog=argv[0], description="Given a series of .img files, produces a full OTA package that installs thoese images") 203 parser.add_argument("input_ota", type=str, 204 help="Input OTA for signing") 205 parser.add_argument('output_ota', type=str, 206 help='Output OTA for the signed package') 207 parser.add_argument("-v", action="store_true", 208 help="Enable verbose logging", dest="verbose") 209 AddSigningArgumentParse(parser) 210 args = parser.parse_args(argv[1:]) 211 input_ota = args.input_ota 212 output_ota = args.output_ota 213 if args.verbose: 214 OPTIONS.verbose = True 215 common.InitLogging() 216 if args.package_key: 217 OPTIONS.package_key = args.package_key 218 logger.info("Re-signing OTA package {}".format(input_ota)) 219 SignOtaPackage(input_ota, output_ota) 220 221if __name__ == "__main__": 222 import sys 223 main(sys.argv)