1#!/usr/bin/env python 2# 3# Copyright (C) 2009 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17""" 18Check the signatures of all APKs in a target_files .zip file. With 19-c, compare the signatures of each package to the ones in a separate 20target_files (usually a previously distributed build for the same 21device) and flag any changes. 22 23Usage: check_target_file_signatures [flags] target_files 24 25 -c (--compare_with) <other_target_files> 26 Look for compatibility problems between the two sets of target 27 files (eg., packages whose keys have changed). 28 29 -l (--local_cert_dirs) <dir,dir,...> 30 Comma-separated list of top-level directories to scan for 31 .x509.pem files. Defaults to "vendor,build". Where cert files 32 can be found that match APK signatures, the filename will be 33 printed as the cert name, otherwise a hash of the cert plus its 34 subject string will be printed instead. 35 36 -t (--text) 37 Dump the certificate information for both packages in comparison 38 mode (this output is normally suppressed). 39 40""" 41 42from __future__ import print_function 43 44import logging 45import os 46import os.path 47import re 48import subprocess 49import sys 50import zipfile 51 52import common 53 54if sys.hexversion < 0x02070000: 55 print("Python 2.7 or newer is required.", file=sys.stderr) 56 sys.exit(1) 57 58 59logger = logging.getLogger(__name__) 60 61 62OPTIONS = common.OPTIONS 63 64OPTIONS.text = False 65OPTIONS.compare_with = None 66OPTIONS.local_cert_dirs = ("vendor", "build") 67 68PROBLEMS = [] 69PROBLEM_PREFIX = [] 70 71 72def AddProblem(msg): 73 logger.error(msg) 74 PROBLEMS.append(" ".join(PROBLEM_PREFIX) + " " + msg) 75 76 77def Push(msg): 78 PROBLEM_PREFIX.append(msg) 79 80 81def Pop(): 82 PROBLEM_PREFIX.pop() 83 84 85def Banner(msg): 86 print("-" * 70) 87 print(" ", msg) 88 print("-" * 70) 89 90 91def GetCertSubject(cert): 92 p = common.Run(["openssl", "x509", "-inform", "DER", "-text"], 93 stdin=subprocess.PIPE, 94 stdout=subprocess.PIPE, 95 universal_newlines=False) 96 out, err = p.communicate(cert) 97 if err and not err.strip(): 98 return "(error reading cert subject)" 99 for line in out.decode().split("\n"): 100 line = line.strip() 101 if line.startswith("Subject:"): 102 return line[8:].strip() 103 return "(unknown cert subject)" 104 105 106class CertDB(object): 107 108 def __init__(self): 109 self.certs = {} 110 111 def Add(self, cert_digest, subject, name=None): 112 if cert_digest in self.certs: 113 if name: 114 self.certs[cert_digest] = self.certs[cert_digest] + "," + name 115 else: 116 if name is None: 117 name = "unknown cert %s (%s)" % (cert_digest[:12], subject) 118 self.certs[cert_digest] = name 119 120 def Get(self, cert_digest): 121 """Return the name for a given cert digest.""" 122 return self.certs.get(cert_digest, None) 123 124 def FindLocalCerts(self): 125 to_load = [] 126 for top in OPTIONS.local_cert_dirs: 127 for dirpath, _, filenames in os.walk(top): 128 certs = [os.path.join(dirpath, i) 129 for i in filenames if i.endswith(".x509.pem")] 130 if certs: 131 to_load.extend(certs) 132 133 for i in to_load: 134 with open(i) as f: 135 cert = common.ParseCertificate(f.read()) 136 name, _ = os.path.splitext(i) 137 name, _ = os.path.splitext(name) 138 139 cert_sha1 = common.sha1(cert).hexdigest() 140 cert_subject = GetCertSubject(cert) 141 self.Add(cert_sha1, cert_subject, name) 142 143 144ALL_CERTS = CertDB() 145 146 147def CertFromPKCS7(data, filename): 148 """Read the cert out of a PKCS#7-format file (which is what is 149 stored in a signed .apk).""" 150 Push(filename + ":") 151 try: 152 p = common.Run(["openssl", "pkcs7", 153 "-inform", "DER", 154 "-outform", "PEM", 155 "-print_certs"], 156 stdin=subprocess.PIPE, 157 stdout=subprocess.PIPE, 158 universal_newlines=False) 159 out, err = p.communicate(data) 160 if err and not err.strip(): 161 AddProblem("error reading cert:\n" + err.decode()) 162 return None 163 164 cert = common.ParseCertificate(out.decode()) 165 if not cert: 166 AddProblem("error parsing cert output") 167 return None 168 return cert 169 finally: 170 Pop() 171 172 173class APK(object): 174 175 def __init__(self, full_filename, filename): 176 self.filename = filename 177 self.cert_digests = frozenset() 178 self.shared_uid = None 179 self.package = None 180 181 Push(filename+":") 182 try: 183 self.RecordCerts(full_filename) 184 self.ReadManifest(full_filename) 185 finally: 186 Pop() 187 188 def ReadCertsDeprecated(self, full_filename): 189 print("reading certs in deprecated way for {}".format(full_filename)) 190 cert_digests = set() 191 with zipfile.ZipFile(full_filename) as apk: 192 for info in apk.infolist(): 193 filename = info.filename 194 if (filename.startswith("META-INF/") and 195 info.filename.endswith((".DSA", ".RSA"))): 196 pkcs7 = apk.read(filename) 197 cert = CertFromPKCS7(pkcs7, filename) 198 if not cert: 199 continue 200 cert_sha1 = common.sha1(cert).hexdigest() 201 cert_subject = GetCertSubject(cert) 202 ALL_CERTS.Add(cert_sha1, cert_subject) 203 cert_digests.add(cert_sha1) 204 if not cert_digests: 205 AddProblem("No signature found") 206 return 207 self.cert_digests = frozenset(cert_digests) 208 209 def RecordCerts(self, full_filename): 210 """Parse and save the signature of an apk file.""" 211 212 # Dump the cert info with apksigner 213 cmd = ["apksigner", "verify", "--print-certs", full_filename] 214 p = common.Run(cmd, stdout=subprocess.PIPE) 215 output, _ = p.communicate() 216 if p.returncode != 0: 217 self.ReadCertsDeprecated(full_filename) 218 return 219 220 # Sample output: 221 # Signer #1 certificate DN: ... 222 # Signer #1 certificate SHA-256 digest: ... 223 # Signer #1 certificate SHA-1 digest: ... 224 # Signer (minSdkVersion=24, maxSdkVersion=32) certificate SHA-256 digest: 56be132b780656fe2444cd34326eb5d7aac91d2096abf0fe673a99270622ec87 225 # Signer (minSdkVersion=24, maxSdkVersion=32) certificate SHA-1 digest: 19da94896ce4078c38ca695701f1dec741ec6d67 226 # ... 227 certs_info = {} 228 certificate_regex = re.compile( 229 r"(Signer (?:#[0-9]+|\(.*\))) (certificate .*):(.*)") 230 for line in output.splitlines(): 231 m = certificate_regex.match(line) 232 if not m: 233 continue 234 signer, key, val = m.group(1), m.group(2), m.group(3) 235 if certs_info.get(signer): 236 certs_info[signer].update({key.strip(): val.strip()}) 237 else: 238 certs_info.update({signer: {key.strip(): val.strip()}}) 239 if not certs_info: 240 AddProblem("Failed to parse cert info") 241 return 242 243 cert_digests = set() 244 for signer, props in certs_info.items(): 245 subject = props.get("certificate DN") 246 digest = props.get("certificate SHA-1 digest") 247 if not subject or not digest: 248 AddProblem("Failed to parse cert subject or digest") 249 return 250 ALL_CERTS.Add(digest, subject) 251 cert_digests.add(digest) 252 self.cert_digests = frozenset(cert_digests) 253 254 def ReadManifest(self, full_filename): 255 p = common.Run(["aapt2", "dump", "xmltree", full_filename, "--file", 256 "AndroidManifest.xml"], 257 stdout=subprocess.PIPE) 258 manifest, err = p.communicate() 259 if err: 260 AddProblem("failed to read manifest " + full_filename) 261 return 262 263 self.shared_uid = None 264 self.package = None 265 266 for line in manifest.split("\n"): 267 line = line.strip() 268 m = re.search(r'A: (\S*?)(?:\(0x[0-9a-f]+\))?="(.*?)" \(Raw', line) 269 if m: 270 name = m.group(1) 271 if name == "android:sharedUserId": 272 if self.shared_uid is not None: 273 AddProblem("multiple sharedUserId declarations " + full_filename) 274 self.shared_uid = m.group(2) 275 elif name == "package": 276 if self.package is not None: 277 AddProblem("multiple package declarations " + full_filename) 278 self.package = m.group(2) 279 280 if self.package is None: 281 AddProblem("no package declaration " + full_filename) 282 283 284class TargetFiles(object): 285 def __init__(self): 286 self.max_pkg_len = 30 287 self.max_fn_len = 20 288 self.apks = None 289 self.apks_by_basename = None 290 self.certmap = None 291 292 def LoadZipFile(self, filename): 293 # First read the APK certs file to figure out whether there are compressed 294 # APKs in the archive. If we do have compressed APKs in the archive, then we 295 # must decompress them individually before we perform any analysis. 296 297 # This is the list of wildcards of files we extract from |filename|. 298 apk_extensions = ['*.apk', '*.apex'] 299 300 with zipfile.ZipFile(filename, "r") as input_zip: 301 self.certmap, compressed_extension = common.ReadApkCerts(input_zip) 302 if compressed_extension: 303 apk_extensions.append('*.apk' + compressed_extension) 304 305 d = common.UnzipTemp(filename, apk_extensions) 306 self.apks = {} 307 self.apks_by_basename = {} 308 for dirpath, _, filenames in os.walk(d): 309 for fn in filenames: 310 # Decompress compressed APKs before we begin processing them. 311 if compressed_extension and fn.endswith(compressed_extension): 312 # First strip the compressed extension from the file. 313 uncompressed_fn = fn[:-len(compressed_extension)] 314 315 # Decompress the compressed file to the output file. 316 common.Gunzip(os.path.join(dirpath, fn), 317 os.path.join(dirpath, uncompressed_fn)) 318 319 # Finally, delete the compressed file and use the uncompressed file 320 # for further processing. Note that the deletion is not strictly 321 # required, but is done here to ensure that we're not using too much 322 # space in the temporary directory. 323 os.remove(os.path.join(dirpath, fn)) 324 fn = uncompressed_fn 325 326 if fn.endswith(('.apk', '.apex')): 327 fullname = os.path.join(dirpath, fn) 328 displayname = fullname[len(d)+1:] 329 apk = APK(fullname, displayname) 330 self.apks[apk.filename] = apk 331 self.apks_by_basename[os.path.basename(apk.filename)] = apk 332 if apk.package: 333 self.max_pkg_len = max(self.max_pkg_len, len(apk.package)) 334 self.max_fn_len = max(self.max_fn_len, len(apk.filename)) 335 336 def CheckSharedUids(self): 337 """Look for any instances where packages signed with different 338 certs request the same sharedUserId.""" 339 apks_by_uid = {} 340 for apk in self.apks.values(): 341 if apk.shared_uid: 342 apks_by_uid.setdefault(apk.shared_uid, []).append(apk) 343 344 for uid in sorted(apks_by_uid): 345 apks = apks_by_uid[uid] 346 for apk in apks[1:]: 347 if apk.certs != apks[0].certs: 348 break 349 else: 350 # all packages have the same set of certs; this uid is fine. 351 continue 352 353 AddProblem("different cert sets for packages with uid %s" % (uid,)) 354 355 print("uid %s is shared by packages with different cert sets:" % (uid,)) 356 for apk in apks: 357 print("%-*s [%s]" % (self.max_pkg_len, apk.package, apk.filename)) 358 for digest in apk.cert_digests: 359 print(" ", ALL_CERTS.Get(digest)) 360 print() 361 362 def CheckExternalSignatures(self): 363 for apk_filename, certname in self.certmap.items(): 364 if certname == "EXTERNAL": 365 # Apps marked EXTERNAL should be signed with the test key 366 # during development, then manually re-signed after 367 # predexopting. Consider it an error if this app is now 368 # signed with any key that is present in our tree. 369 apk = self.apks_by_basename[apk_filename] 370 signed_with_external = False 371 for digest in apk.cert_digests: 372 name = ALL_CERTS.Get(digest) 373 if name and name.startswith("unknown "): 374 signed_with_external = True 375 376 if not signed_with_external: 377 Push(apk.filename) 378 AddProblem("hasn't been signed with EXTERNAL cert") 379 Pop() 380 381 def PrintCerts(self): 382 """Display a table of packages grouped by cert.""" 383 by_digest = {} 384 for apk in self.apks.values(): 385 for digest in apk.cert_digests: 386 if apk.package: 387 by_digest.setdefault(digest, []).append((apk.package, apk)) 388 389 order = [(-len(v), k) for (k, v) in by_digest.items()] 390 order.sort() 391 392 for _, digest in order: 393 print("%s:" % (ALL_CERTS.Get(digest),)) 394 apks = by_digest[digest] 395 apks.sort(key=lambda x: x[0]) 396 for i in range(1, len(apks)): 397 pkgname, apk = apks[i] 398 if pkgname == apks[i-1][0]: 399 print("Both {} and {} have same package name {}".format( 400 apk.filename, apks[i-1][1].filename, pkgname)) 401 for _, apk in apks: 402 if apk.shared_uid: 403 print(" %-*s %-*s [%s]" % (self.max_fn_len, apk.filename, 404 self.max_pkg_len, apk.package, 405 apk.shared_uid)) 406 else: 407 print(" %-*s %s" % (self.max_fn_len, apk.filename, apk.package)) 408 print() 409 410 def CompareWith(self, other): 411 """Look for instances where a given package that exists in both 412 self and other have different certs.""" 413 414 all_apks = set(self.apks.keys()) 415 all_apks.update(other.apks.keys()) 416 417 max_pkg_len = max(self.max_pkg_len, other.max_pkg_len) 418 419 by_digestpair = {} 420 421 for i in all_apks: 422 if i in self.apks: 423 if i in other.apks: 424 # in both; should have same set of certs 425 if self.apks[i].cert_digests != other.apks[i].cert_digests: 426 by_digestpair.setdefault((other.apks[i].cert_digests, 427 self.apks[i].cert_digests), []).append(i) 428 else: 429 print("%s [%s]: new APK (not in comparison target_files)" % ( 430 i, self.apks[i].filename)) 431 else: 432 if i in other.apks: 433 print("%s [%s]: removed APK (only in comparison target_files)" % ( 434 i, other.apks[i].filename)) 435 436 if by_digestpair: 437 AddProblem("some APKs changed certs") 438 Banner("APK signing differences") 439 for (old, new), packages in sorted(by_digestpair.items()): 440 for i, o in enumerate(old): 441 if i == 0: 442 print("was", ALL_CERTS.Get(o)) 443 else: 444 print(" ", ALL_CERTS.Get(o)) 445 for i, n in enumerate(new): 446 if i == 0: 447 print("now", ALL_CERTS.Get(n)) 448 else: 449 print(" ", ALL_CERTS.Get(n)) 450 for i in sorted(packages): 451 old_fn = other.apks[i].filename 452 new_fn = self.apks[i].filename 453 if old_fn == new_fn: 454 print(" %-*s [%s]" % (max_pkg_len, i, old_fn)) 455 else: 456 print(" %-*s [was: %s; now: %s]" % (max_pkg_len, i, 457 old_fn, new_fn)) 458 print() 459 460 461def main(argv): 462 def option_handler(o, a): 463 if o in ("-c", "--compare_with"): 464 OPTIONS.compare_with = a 465 elif o in ("-l", "--local_cert_dirs"): 466 OPTIONS.local_cert_dirs = [i.strip() for i in a.split(",")] 467 elif o in ("-t", "--text"): 468 OPTIONS.text = True 469 else: 470 return False 471 return True 472 473 args = common.ParseOptions(argv, __doc__, 474 extra_opts="c:l:t", 475 extra_long_opts=["compare_with=", 476 "local_cert_dirs="], 477 extra_option_handler=option_handler) 478 479 if len(args) != 1: 480 common.Usage(__doc__) 481 sys.exit(1) 482 483 common.InitLogging() 484 485 ALL_CERTS.FindLocalCerts() 486 487 Push("input target_files:") 488 try: 489 target_files = TargetFiles() 490 target_files.LoadZipFile(args[0]) 491 finally: 492 Pop() 493 494 compare_files = None 495 if OPTIONS.compare_with: 496 Push("comparison target_files:") 497 try: 498 compare_files = TargetFiles() 499 compare_files.LoadZipFile(OPTIONS.compare_with) 500 finally: 501 Pop() 502 503 if OPTIONS.text or not compare_files: 504 Banner("target files") 505 target_files.PrintCerts() 506 target_files.CheckSharedUids() 507 target_files.CheckExternalSignatures() 508 if compare_files: 509 if OPTIONS.text: 510 Banner("comparison files") 511 compare_files.PrintCerts() 512 target_files.CompareWith(compare_files) 513 514 if PROBLEMS: 515 print("%d problem(s) found:\n" % (len(PROBLEMS),)) 516 for p in PROBLEMS: 517 print(p) 518 return 1 519 520 return 0 521 522 523if __name__ == '__main__': 524 try: 525 r = main(sys.argv[1:]) 526 sys.exit(r) 527 finally: 528 common.Cleanup() 529