1#!/usr/bin/env python
2#
3# Copyright (C) 2009 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18Check the signatures of all APKs in a target_files .zip file.  With
19-c, compare the signatures of each package to the ones in a separate
20target_files (usually a previously distributed build for the same
21device) and flag any changes.
22
23Usage:  check_target_file_signatures [flags] target_files
24
25  -c  (--compare_with)  <other_target_files>
26      Look for compatibility problems between the two sets of target
27      files (eg., packages whose keys have changed).
28
29  -l  (--local_cert_dirs)  <dir,dir,...>
30      Comma-separated list of top-level directories to scan for
31      .x509.pem files.  Defaults to "vendor,build".  Where cert files
32      can be found that match APK signatures, the filename will be
33      printed as the cert name, otherwise a hash of the cert plus its
34      subject string will be printed instead.
35
36  -t  (--text)
37      Dump the certificate information for both packages in comparison
38      mode (this output is normally suppressed).
39
40"""
41
42from __future__ import print_function
43
44import logging
45import os
46import os.path
47import re
48import subprocess
49import sys
50import zipfile
51
52import common
53
54if sys.hexversion < 0x02070000:
55  print("Python 2.7 or newer is required.", file=sys.stderr)
56  sys.exit(1)
57
58
59logger = logging.getLogger(__name__)
60
61
62OPTIONS = common.OPTIONS
63
64OPTIONS.text = False
65OPTIONS.compare_with = None
66OPTIONS.local_cert_dirs = ("vendor", "build")
67
68PROBLEMS = []
69PROBLEM_PREFIX = []
70
71
72def AddProblem(msg):
73  logger.error(msg)
74  PROBLEMS.append(" ".join(PROBLEM_PREFIX) + " " + msg)
75
76
77def Push(msg):
78  PROBLEM_PREFIX.append(msg)
79
80
81def Pop():
82  PROBLEM_PREFIX.pop()
83
84
85def Banner(msg):
86  print("-" * 70)
87  print("  ", msg)
88  print("-" * 70)
89
90
91def GetCertSubject(cert):
92  p = common.Run(["openssl", "x509", "-inform", "DER", "-text"],
93                 stdin=subprocess.PIPE,
94                 stdout=subprocess.PIPE,
95                 universal_newlines=False)
96  out, err = p.communicate(cert)
97  if err and not err.strip():
98    return "(error reading cert subject)"
99  for line in out.decode().split("\n"):
100    line = line.strip()
101    if line.startswith("Subject:"):
102      return line[8:].strip()
103  return "(unknown cert subject)"
104
105
106class CertDB(object):
107
108  def __init__(self):
109    self.certs = {}
110
111  def Add(self, cert_digest, subject, name=None):
112    if cert_digest in self.certs:
113      if name:
114        self.certs[cert_digest] = self.certs[cert_digest] + "," + name
115    else:
116      if name is None:
117        name = "unknown cert %s (%s)" % (cert_digest[:12], subject)
118      self.certs[cert_digest] = name
119
120  def Get(self, cert_digest):
121    """Return the name for a given cert digest."""
122    return self.certs.get(cert_digest, None)
123
124  def FindLocalCerts(self):
125    to_load = []
126    for top in OPTIONS.local_cert_dirs:
127      for dirpath, _, filenames in os.walk(top):
128        certs = [os.path.join(dirpath, i)
129                 for i in filenames if i.endswith(".x509.pem")]
130        if certs:
131          to_load.extend(certs)
132
133    for i in to_load:
134      with open(i) as f:
135        cert = common.ParseCertificate(f.read())
136      name, _ = os.path.splitext(i)
137      name, _ = os.path.splitext(name)
138
139      cert_sha1 = common.sha1(cert).hexdigest()
140      cert_subject = GetCertSubject(cert)
141      self.Add(cert_sha1, cert_subject, name)
142
143
144ALL_CERTS = CertDB()
145
146
147def CertFromPKCS7(data, filename):
148  """Read the cert out of a PKCS#7-format file (which is what is
149  stored in a signed .apk)."""
150  Push(filename + ":")
151  try:
152    p = common.Run(["openssl", "pkcs7",
153                    "-inform", "DER",
154                    "-outform", "PEM",
155                    "-print_certs"],
156                   stdin=subprocess.PIPE,
157                   stdout=subprocess.PIPE,
158                   universal_newlines=False)
159    out, err = p.communicate(data)
160    if err and not err.strip():
161      AddProblem("error reading cert:\n" + err.decode())
162      return None
163
164    cert = common.ParseCertificate(out.decode())
165    if not cert:
166      AddProblem("error parsing cert output")
167      return None
168    return cert
169  finally:
170    Pop()
171
172
173class APK(object):
174
175  def __init__(self, full_filename, filename):
176    self.filename = filename
177    self.cert_digests = frozenset()
178    self.shared_uid = None
179    self.package = None
180
181    Push(filename+":")
182    try:
183      self.RecordCerts(full_filename)
184      self.ReadManifest(full_filename)
185    finally:
186      Pop()
187
188  def ReadCertsDeprecated(self, full_filename):
189    print("reading certs in deprecated way for {}".format(full_filename))
190    cert_digests = set()
191    with zipfile.ZipFile(full_filename) as apk:
192      for info in apk.infolist():
193        filename = info.filename
194        if (filename.startswith("META-INF/") and
195                info.filename.endswith((".DSA", ".RSA"))):
196          pkcs7 = apk.read(filename)
197          cert = CertFromPKCS7(pkcs7, filename)
198          if not cert:
199            continue
200          cert_sha1 = common.sha1(cert).hexdigest()
201          cert_subject = GetCertSubject(cert)
202          ALL_CERTS.Add(cert_sha1, cert_subject)
203          cert_digests.add(cert_sha1)
204    if not cert_digests:
205      AddProblem("No signature found")
206      return
207    self.cert_digests = frozenset(cert_digests)
208
209  def RecordCerts(self, full_filename):
210    """Parse and save the signature of an apk file."""
211
212    # Dump the cert info with apksigner
213    cmd = ["apksigner", "verify", "--print-certs", full_filename]
214    p = common.Run(cmd, stdout=subprocess.PIPE)
215    output, _ = p.communicate()
216    if p.returncode != 0:
217      self.ReadCertsDeprecated(full_filename)
218      return
219
220    # Sample output:
221    # Signer #1 certificate DN: ...
222    # Signer #1 certificate SHA-256 digest: ...
223    # Signer #1 certificate SHA-1 digest: ...
224    # Signer (minSdkVersion=24, maxSdkVersion=32) certificate SHA-256 digest: 56be132b780656fe2444cd34326eb5d7aac91d2096abf0fe673a99270622ec87
225    # Signer (minSdkVersion=24, maxSdkVersion=32) certificate SHA-1 digest: 19da94896ce4078c38ca695701f1dec741ec6d67
226    # ...
227    certs_info = {}
228    certificate_regex = re.compile(
229        r"(Signer (?:#[0-9]+|\(.*\))) (certificate .*):(.*)")
230    for line in output.splitlines():
231      m = certificate_regex.match(line)
232      if not m:
233        continue
234      signer, key, val = m.group(1), m.group(2), m.group(3)
235      if certs_info.get(signer):
236        certs_info[signer].update({key.strip(): val.strip()})
237      else:
238        certs_info.update({signer: {key.strip(): val.strip()}})
239    if not certs_info:
240      AddProblem("Failed to parse cert info")
241      return
242
243    cert_digests = set()
244    for signer, props in certs_info.items():
245      subject = props.get("certificate DN")
246      digest = props.get("certificate SHA-1 digest")
247      if not subject or not digest:
248        AddProblem("Failed to parse cert subject or digest")
249        return
250      ALL_CERTS.Add(digest, subject)
251      cert_digests.add(digest)
252    self.cert_digests = frozenset(cert_digests)
253
254  def ReadManifest(self, full_filename):
255    p = common.Run(["aapt2", "dump", "xmltree", full_filename, "--file",
256                    "AndroidManifest.xml"],
257                   stdout=subprocess.PIPE)
258    manifest, err = p.communicate()
259    if err:
260      AddProblem("failed to read manifest " + full_filename)
261      return
262
263    self.shared_uid = None
264    self.package = None
265
266    for line in manifest.split("\n"):
267      line = line.strip()
268      m = re.search(r'A: (\S*?)(?:\(0x[0-9a-f]+\))?="(.*?)" \(Raw', line)
269      if m:
270        name = m.group(1)
271        if name == "android:sharedUserId":
272          if self.shared_uid is not None:
273            AddProblem("multiple sharedUserId declarations " + full_filename)
274          self.shared_uid = m.group(2)
275        elif name == "package":
276          if self.package is not None:
277            AddProblem("multiple package declarations " + full_filename)
278          self.package = m.group(2)
279
280    if self.package is None:
281      AddProblem("no package declaration " + full_filename)
282
283
284class TargetFiles(object):
285  def __init__(self):
286    self.max_pkg_len = 30
287    self.max_fn_len = 20
288    self.apks = None
289    self.apks_by_basename = None
290    self.certmap = None
291
292  def LoadZipFile(self, filename):
293    # First read the APK certs file to figure out whether there are compressed
294    # APKs in the archive. If we do have compressed APKs in the archive, then we
295    # must decompress them individually before we perform any analysis.
296
297    # This is the list of wildcards of files we extract from |filename|.
298    apk_extensions = ['*.apk', '*.apex']
299
300    with zipfile.ZipFile(filename, "r") as input_zip:
301      self.certmap, compressed_extension = common.ReadApkCerts(input_zip)
302    if compressed_extension:
303      apk_extensions.append('*.apk' + compressed_extension)
304
305    d = common.UnzipTemp(filename, apk_extensions)
306    self.apks = {}
307    self.apks_by_basename = {}
308    for dirpath, _, filenames in os.walk(d):
309      for fn in filenames:
310        # Decompress compressed APKs before we begin processing them.
311        if compressed_extension and fn.endswith(compressed_extension):
312          # First strip the compressed extension from the file.
313          uncompressed_fn = fn[:-len(compressed_extension)]
314
315          # Decompress the compressed file to the output file.
316          common.Gunzip(os.path.join(dirpath, fn),
317                        os.path.join(dirpath, uncompressed_fn))
318
319          # Finally, delete the compressed file and use the uncompressed file
320          # for further processing. Note that the deletion is not strictly
321          # required, but is done here to ensure that we're not using too much
322          # space in the temporary directory.
323          os.remove(os.path.join(dirpath, fn))
324          fn = uncompressed_fn
325
326        if fn.endswith(('.apk', '.apex')):
327          fullname = os.path.join(dirpath, fn)
328          displayname = fullname[len(d)+1:]
329          apk = APK(fullname, displayname)
330          self.apks[apk.filename] = apk
331          self.apks_by_basename[os.path.basename(apk.filename)] = apk
332          if apk.package:
333            self.max_pkg_len = max(self.max_pkg_len, len(apk.package))
334          self.max_fn_len = max(self.max_fn_len, len(apk.filename))
335
336  def CheckSharedUids(self):
337    """Look for any instances where packages signed with different
338    certs request the same sharedUserId."""
339    apks_by_uid = {}
340    for apk in self.apks.values():
341      if apk.shared_uid:
342        apks_by_uid.setdefault(apk.shared_uid, []).append(apk)
343
344    for uid in sorted(apks_by_uid):
345      apks = apks_by_uid[uid]
346      for apk in apks[1:]:
347        if apk.certs != apks[0].certs:
348          break
349      else:
350        # all packages have the same set of certs; this uid is fine.
351        continue
352
353      AddProblem("different cert sets for packages with uid %s" % (uid,))
354
355      print("uid %s is shared by packages with different cert sets:" % (uid,))
356      for apk in apks:
357        print("%-*s  [%s]" % (self.max_pkg_len, apk.package, apk.filename))
358        for digest in apk.cert_digests:
359          print("   ", ALL_CERTS.Get(digest))
360      print()
361
362  def CheckExternalSignatures(self):
363    for apk_filename, certname in self.certmap.items():
364      if certname == "EXTERNAL":
365        # Apps marked EXTERNAL should be signed with the test key
366        # during development, then manually re-signed after
367        # predexopting.  Consider it an error if this app is now
368        # signed with any key that is present in our tree.
369        apk = self.apks_by_basename[apk_filename]
370        signed_with_external = False
371        for digest in apk.cert_digests:
372          name = ALL_CERTS.Get(digest)
373          if name and name.startswith("unknown "):
374            signed_with_external = True
375
376        if not signed_with_external:
377          Push(apk.filename)
378          AddProblem("hasn't been signed with EXTERNAL cert")
379          Pop()
380
381  def PrintCerts(self):
382    """Display a table of packages grouped by cert."""
383    by_digest = {}
384    for apk in self.apks.values():
385      for digest in apk.cert_digests:
386        if apk.package:
387          by_digest.setdefault(digest, []).append((apk.package, apk))
388
389    order = [(-len(v), k) for (k, v) in by_digest.items()]
390    order.sort()
391
392    for _, digest in order:
393      print("%s:" % (ALL_CERTS.Get(digest),))
394      apks = by_digest[digest]
395      apks.sort(key=lambda x: x[0])
396      for i in range(1, len(apks)):
397        pkgname, apk = apks[i]
398        if pkgname == apks[i-1][0]:
399          print("Both {} and {} have same package name {}".format(
400              apk.filename, apks[i-1][1].filename, pkgname))
401      for _, apk in apks:
402        if apk.shared_uid:
403          print("  %-*s  %-*s  [%s]" % (self.max_fn_len, apk.filename,
404                                        self.max_pkg_len, apk.package,
405                                        apk.shared_uid))
406        else:
407          print("  %-*s  %s" % (self.max_fn_len, apk.filename, apk.package))
408      print()
409
410  def CompareWith(self, other):
411    """Look for instances where a given package that exists in both
412    self and other have different certs."""
413
414    all_apks = set(self.apks.keys())
415    all_apks.update(other.apks.keys())
416
417    max_pkg_len = max(self.max_pkg_len, other.max_pkg_len)
418
419    by_digestpair = {}
420
421    for i in all_apks:
422      if i in self.apks:
423        if i in other.apks:
424          # in both; should have same set of certs
425          if self.apks[i].cert_digests != other.apks[i].cert_digests:
426            by_digestpair.setdefault((other.apks[i].cert_digests,
427                                      self.apks[i].cert_digests), []).append(i)
428        else:
429          print("%s [%s]: new APK (not in comparison target_files)" % (
430              i, self.apks[i].filename))
431      else:
432        if i in other.apks:
433          print("%s [%s]: removed APK (only in comparison target_files)" % (
434              i, other.apks[i].filename))
435
436    if by_digestpair:
437      AddProblem("some APKs changed certs")
438      Banner("APK signing differences")
439      for (old, new), packages in sorted(by_digestpair.items()):
440        for i, o in enumerate(old):
441          if i == 0:
442            print("was", ALL_CERTS.Get(o))
443          else:
444            print("   ", ALL_CERTS.Get(o))
445        for i, n in enumerate(new):
446          if i == 0:
447            print("now", ALL_CERTS.Get(n))
448          else:
449            print("   ", ALL_CERTS.Get(n))
450        for i in sorted(packages):
451          old_fn = other.apks[i].filename
452          new_fn = self.apks[i].filename
453          if old_fn == new_fn:
454            print("  %-*s  [%s]" % (max_pkg_len, i, old_fn))
455          else:
456            print("  %-*s  [was: %s; now: %s]" % (max_pkg_len, i,
457                                                  old_fn, new_fn))
458        print()
459
460
461def main(argv):
462  def option_handler(o, a):
463    if o in ("-c", "--compare_with"):
464      OPTIONS.compare_with = a
465    elif o in ("-l", "--local_cert_dirs"):
466      OPTIONS.local_cert_dirs = [i.strip() for i in a.split(",")]
467    elif o in ("-t", "--text"):
468      OPTIONS.text = True
469    else:
470      return False
471    return True
472
473  args = common.ParseOptions(argv, __doc__,
474                             extra_opts="c:l:t",
475                             extra_long_opts=["compare_with=",
476                                              "local_cert_dirs="],
477                             extra_option_handler=option_handler)
478
479  if len(args) != 1:
480    common.Usage(__doc__)
481    sys.exit(1)
482
483  common.InitLogging()
484
485  ALL_CERTS.FindLocalCerts()
486
487  Push("input target_files:")
488  try:
489    target_files = TargetFiles()
490    target_files.LoadZipFile(args[0])
491  finally:
492    Pop()
493
494  compare_files = None
495  if OPTIONS.compare_with:
496    Push("comparison target_files:")
497    try:
498      compare_files = TargetFiles()
499      compare_files.LoadZipFile(OPTIONS.compare_with)
500    finally:
501      Pop()
502
503  if OPTIONS.text or not compare_files:
504    Banner("target files")
505    target_files.PrintCerts()
506  target_files.CheckSharedUids()
507  target_files.CheckExternalSignatures()
508  if compare_files:
509    if OPTIONS.text:
510      Banner("comparison files")
511      compare_files.PrintCerts()
512    target_files.CompareWith(compare_files)
513
514  if PROBLEMS:
515    print("%d problem(s) found:\n" % (len(PROBLEMS),))
516    for p in PROBLEMS:
517      print(p)
518    return 1
519
520  return 0
521
522
523if __name__ == '__main__':
524  try:
525    r = main(sys.argv[1:])
526    sys.exit(r)
527  finally:
528    common.Cleanup()
529