1#!/usr/bin/env python3
2#
3# Copyright (C) 2022 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import common
18import logging
19import shlex
20import argparse
21import tempfile
22import zipfile
23import shutil
24from common import OPTIONS, OptionHandler
25from ota_signing_utils import AddSigningArgumentParse
26
27logger = logging.getLogger(__name__)
28
29OPTIONS.payload_signer = None
30OPTIONS.payload_signer_args = []
31OPTIONS.payload_signer_maximum_signature_size = None
32OPTIONS.package_key = None
33
34PAYLOAD_BIN = 'payload.bin'
35PAYLOAD_PROPERTIES_TXT = 'payload_properties.txt'
36
37class SignerOptions(OptionHandler):
38
39  @staticmethod
40  def ParseOptions(o, a):
41    if o in ("-k", "--package_key"):
42      OPTIONS.package_key = a
43    elif o == "--payload_signer":
44      OPTIONS.payload_signer = a
45    elif o == "--payload_signer_args":
46      OPTIONS.payload_signer_args = shlex.split(a)
47    elif o == "--payload_signer_maximum_signature_size":
48      OPTIONS.payload_signer_maximum_signature_size = a
49    elif o == "--payload_signer_key_size":
50      # TODO(xunchang) remove this option after cleaning up the callers.
51      logger.warning("The option '--payload_signer_key_size' is deprecated."
52                      " Use '--payload_signer_maximum_signature_size' instead.")
53      OPTIONS.payload_signer_maximum_signature_size = a
54    else:
55      return False
56    return True
57
58  def __init__(self):
59    super().__init__(
60      ["payload_signer=",
61       "package_key=",
62       "payload_signer_args=",
63       "payload_signer_maximum_signature_size=",
64       "payload_signer_key_size="],
65       SignerOptions.ParseOptions
66    )
67
68
69signer_options = SignerOptions()
70
71
72class PayloadSigner(object):
73  """A class that wraps the payload signing works.
74
75  When generating a Payload, hashes of the payload and metadata files will be
76  signed with the device key, either by calling an external payload signer or
77  by calling openssl with the package key. This class provides a unified
78  interface, so that callers can just call PayloadSigner.Sign().
79
80  If an external payload signer has been specified (OPTIONS.payload_signer), it
81  calls the signer with the provided args (OPTIONS.payload_signer_args). Note
82  that the signing key should be provided as part of the payload_signer_args.
83  Otherwise without an external signer, it uses the package key
84  (OPTIONS.package_key) and calls openssl for the signing works.
85  """
86
87  def __init__(self, package_key=None, private_key_suffix=None, pw=None, payload_signer=None,
88               payload_signer_args=None, payload_signer_maximum_signature_size=None):
89    if package_key is None:
90      package_key = OPTIONS.package_key
91    if private_key_suffix is None:
92      private_key_suffix = OPTIONS.private_key_suffix
93    if payload_signer_args is None:
94      payload_signer_args = OPTIONS.payload_signer_args
95    if payload_signer_maximum_signature_size is None:
96      payload_signer_maximum_signature_size = OPTIONS.payload_signer_maximum_signature_size
97
98    if payload_signer is None:
99      # Prepare the payload signing key.
100      private_key = package_key + private_key_suffix
101
102      cmd = ["openssl", "pkcs8", "-in", private_key, "-inform", "DER"]
103      cmd.extend(["-passin", "pass:" + pw] if pw else ["-nocrypt"])
104      signing_key = common.MakeTempFile(prefix="key-", suffix=".key")
105      cmd.extend(["-out", signing_key])
106      common.RunAndCheckOutput(cmd, verbose=True)
107
108      self.signer = "openssl"
109      self.signer_args = ["pkeyutl", "-sign", "-inkey", signing_key,
110                          "-pkeyopt", "digest:sha256"]
111      self.maximum_signature_size = self._GetMaximumSignatureSizeInBytes(
112          signing_key)
113    else:
114      self.signer = payload_signer
115      self.signer_args = payload_signer_args
116      if payload_signer_maximum_signature_size:
117        self.maximum_signature_size = int(
118            payload_signer_maximum_signature_size)
119      else:
120        # The legacy config uses RSA2048 keys.
121        logger.warning("The maximum signature size for payload signer is not"
122                       " set, default to 256 bytes.")
123        self.maximum_signature_size = 256
124
125  @staticmethod
126  def _GetMaximumSignatureSizeInBytes(signing_key):
127    out_signature_size_file = common.MakeTempFile("signature_size")
128    cmd = ["delta_generator", "--out_maximum_signature_size_file={}".format(
129        out_signature_size_file), "--private_key={}".format(signing_key)]
130    common.RunAndCheckOutput(cmd, verbose=True)
131    with open(out_signature_size_file) as f:
132      signature_size = f.read().rstrip()
133    logger.info("%s outputs the maximum signature size: %s", cmd[0],
134                signature_size)
135    return int(signature_size)
136
137  @staticmethod
138  def _Run(cmd):
139    common.RunAndCheckOutput(cmd, stdout=None, stderr=None)
140
141  def SignPayload(self, unsigned_payload):
142
143    # 1. Generate hashes of the payload and metadata files.
144    payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
145    metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
146    cmd = ["delta_generator",
147           "--in_file=" + unsigned_payload,
148           "--signature_size=" + str(self.maximum_signature_size),
149           "--out_metadata_hash_file=" + metadata_sig_file,
150           "--out_hash_file=" + payload_sig_file]
151    self._Run(cmd)
152
153    # 2. Sign the hashes.
154    signed_payload_sig_file = self.SignHashFile(payload_sig_file)
155    signed_metadata_sig_file = self.SignHashFile(metadata_sig_file)
156
157    # 3. Insert the signatures back into the payload file.
158    signed_payload_file = common.MakeTempFile(prefix="signed-payload-",
159                                              suffix=".bin")
160    cmd = ["delta_generator",
161           "--in_file=" + unsigned_payload,
162           "--out_file=" + signed_payload_file,
163           "--signature_size=" + str(self.maximum_signature_size),
164           "--metadata_signature_file=" + signed_metadata_sig_file,
165           "--payload_signature_file=" + signed_payload_sig_file]
166    self._Run(cmd)
167    return signed_payload_file
168
169  def SignHashFile(self, in_file):
170    """Signs the given input file. Returns the output filename."""
171    out_file = common.MakeTempFile(prefix="signed-", suffix=".bin")
172    cmd = [self.signer] + self.signer_args + ['-in', in_file, '-out', out_file]
173    common.RunAndCheckOutput(cmd)
174    return out_file
175
176def GeneratePayloadProperties(payload_file):
177    properties_file = common.MakeTempFile(prefix="payload-properties-",
178                                          suffix=".txt")
179    cmd = ["delta_generator",
180           "--in_file=" + payload_file,
181           "--properties_file=" + properties_file]
182    common.RunAndCheckOutput(cmd)
183    return properties_file
184
185def SignOtaPackage(input_path, output_path):
186  payload_signer = PayloadSigner(
187      OPTIONS.package_key, OPTIONS.private_key_suffix,
188      None, OPTIONS.payload_signer, OPTIONS.payload_signer_args)
189  common.ZipExclude(input_path, output_path, [PAYLOAD_BIN, PAYLOAD_PROPERTIES_TXT])
190  with tempfile.NamedTemporaryFile() as unsigned_payload, zipfile.ZipFile(input_path, "r", allowZip64=True) as zfp:
191    with zfp.open("payload.bin") as payload_fp:
192      shutil.copyfileobj(payload_fp, unsigned_payload)
193    signed_payload = payload_signer.SignPayload(unsigned_payload.name)
194    properties_file = GeneratePayloadProperties(signed_payload)
195    with zipfile.ZipFile(output_path, "a", compression=zipfile.ZIP_STORED, allowZip64=True) as output_zfp:
196      common.ZipWrite(output_zfp, signed_payload, PAYLOAD_BIN)
197      common.ZipWrite(output_zfp, properties_file, PAYLOAD_PROPERTIES_TXT)
198
199
200def main(argv):
201  parser = argparse.ArgumentParser(
202      prog=argv[0], description="Given a series of .img files, produces a full OTA package that installs thoese images")
203  parser.add_argument("input_ota", type=str,
204                      help="Input OTA for signing")
205  parser.add_argument('output_ota', type=str,
206                      help='Output OTA for the signed package')
207  parser.add_argument("-v", action="store_true",
208                      help="Enable verbose logging", dest="verbose")
209  AddSigningArgumentParse(parser)
210  args = parser.parse_args(argv[1:])
211  input_ota = args.input_ota
212  output_ota = args.output_ota
213  if args.verbose:
214    OPTIONS.verbose = True
215  common.InitLogging()
216  if args.package_key:
217    OPTIONS.package_key = args.package_key
218  logger.info("Re-signing OTA package {}".format(input_ota))
219  SignOtaPackage(input_ota, output_ota)
220
221if __name__ == "__main__":
222  import sys
223  main(sys.argv)