1#!/usr/bin/env python 2# 3# Copyright (C) 2020 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17"""Unit tests for apexer.""" 18 19import hashlib 20import json 21import logging 22import os 23import shutil 24import stat 25import subprocess 26import tempfile 27import unittest 28from importlib import resources 29from zipfile import ZipFile 30 31from apex_manifest import ValidateApexManifest 32from apex_manifest import ParseApexManifest 33 34logger = logging.getLogger(__name__) 35 36TEST_APEX = "com.android.example.apex" 37TEST_APEX_LEGACY = "com.android.example-legacy.apex" 38TEST_APEX_WITH_LOGGING_PARENT = "com.android.example-logging_parent.apex" 39TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME = "com.android.example-overridden_package_name.apex" 40 41TEST_PRIVATE_KEY = os.path.join("testdata", "com.android.example.apex.pem") 42TEST_X509_KEY = os.path.join("testdata", "com.android.example.apex.x509.pem") 43TEST_PK8_KEY = os.path.join("testdata", "com.android.example.apex.pk8") 44TEST_AVB_PUBLIC_KEY = os.path.join("testdata", "com.android.example.apex.avbpubkey") 45TEST_MANIFEST_JSON = os.path.join("testdata", "manifest.json") 46 47def run(args, verbose=None, **kwargs): 48 """Creates and returns a subprocess.Popen object. 49 50 Args: 51 args: The command represented as a list of strings. 52 verbose: Whether the commands should be shown. Default to the global 53 verbosity if unspecified. 54 kwargs: Any additional args to be passed to subprocess.Popen(), such as env, 55 stdin, etc. stdout and stderr will default to subprocess.PIPE and 56 subprocess.STDOUT respectively unless caller specifies any of them. 57 universal_newlines will default to True, as most of the users in 58 releasetools expect string output. 59 60 Returns: 61 A subprocess.Popen object. 62 """ 63 if 'stdout' not in kwargs and 'stderr' not in kwargs: 64 kwargs['stdout'] = subprocess.PIPE 65 kwargs['stderr'] = subprocess.STDOUT 66 if 'universal_newlines' not in kwargs: 67 kwargs['universal_newlines'] = True 68 # Don't log any if caller explicitly says so. 69 if DEBUG_TEST: 70 print("\nRunning: \n%s\n" % " ".join(args)) 71 if verbose: 72 logger.info(" Running: \"%s\"", " ".join(args)) 73 return subprocess.Popen(args, **kwargs) 74 75 76def run_host_command(args, verbose=None, **kwargs): 77 host_build_top = os.environ.get("ANDROID_BUILD_TOP") 78 if host_build_top: 79 host_command_dir = os.path.join(host_build_top, "out/host/linux-x86/bin") 80 args[0] = os.path.join(host_command_dir, args[0]) 81 return run_and_check_output(args, verbose, **kwargs) 82 83 84def run_and_check_output(args, verbose=None, **kwargs): 85 """Runs the given command and returns the output. 86 87 Args: 88 args: The command represented as a list of strings. 89 verbose: Whether the commands should be shown. Default to the global 90 verbosity if unspecified. 91 kwargs: Any additional args to be passed to subprocess.Popen(), such as env, 92 stdin, etc. stdout and stderr will default to subprocess.PIPE and 93 subprocess.STDOUT respectively unless caller specifies any of them. 94 95 Returns: 96 The output string. 97 98 Raises: 99 ExternalError: On non-zero exit from the command. 100 """ 101 proc = run(args, verbose=verbose, **kwargs) 102 output, _ = proc.communicate() 103 if output is None: 104 output = "" 105 # Don't log any if caller explicitly says so. 106 if verbose: 107 logger.info("%s", output.rstrip()) 108 if proc.returncode != 0: 109 raise RuntimeError( 110 "Failed to run command '{}' (exit code {}):\n{}".format( 111 args, proc.returncode, output)) 112 return output 113 114 115def get_sha1sum(file_path): 116 h = hashlib.sha256() 117 118 with open(file_path, 'rb') as file: 119 while True: 120 # Reading is buffered, so we can read smaller chunks. 121 chunk = file.read(h.block_size) 122 if not chunk: 123 break 124 h.update(chunk) 125 126 return h.hexdigest() 127 128 129def round_up(size, unit): 130 assert unit & (unit - 1) == 0 131 return (size + unit - 1) & (~(unit - 1)) 132 133# In order to debug test failures, set DEBUG_TEST to True and run the test from 134# local workstation bypassing atest, e.g.: 135# $ m apexer_test && out/host/linux-x86/nativetest64/apexer_test/apexer_test 136# 137# the test will print out the command used, and the temporary files used by the 138# test. You need to compare e.g. /tmp/test_simple_apex_input_XXXXXXXX.apex with 139# /tmp/test_simple_apex_repacked_YYYYYYYY.apex to check where they are 140# different. 141# A simple script to analyze the differences: 142# 143# FILE_INPUT=/tmp/test_simple_apex_input_XXXXXXXX.apex 144# FILE_OUTPUT=/tmp/test_simple_apex_repacked_YYYYYYYY.apex 145# 146# cd ~/tmp/ 147# rm -rf input output 148# mkdir input output 149# unzip ${FILE_INPUT} -d input/ 150# unzip ${FILE_OUTPUT} -d output/ 151# 152# diff -r input/ output/ 153# 154# For analyzing binary diffs I had mild success using the vbindiff utility. 155DEBUG_TEST = False 156 157 158class ApexerRebuildTest(unittest.TestCase): 159 def setUp(self): 160 self._to_cleanup = [] 161 self._get_host_tools() 162 163 def tearDown(self): 164 if not DEBUG_TEST: 165 for i in self._to_cleanup: 166 if os.path.isdir(i): 167 shutil.rmtree(i, ignore_errors=True) 168 else: 169 os.remove(i) 170 del self._to_cleanup[:] 171 else: 172 print(self._to_cleanup) 173 174 def _get_host_tools(self): 175 dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_host_tools_") 176 self._to_cleanup.append(dir_name) 177 with resources.files("apexer_test").joinpath("apexer_test_host_tools.zip").open('rb') as f: 178 with ZipFile(f, 'r') as zip_obj: 179 zip_obj.extractall(path=dir_name) 180 181 files = {} 182 for i in ["apexer", "deapexer", "avbtool", "mke2fs", "sefcontext_compile", "e2fsdroid", 183 "resize2fs", "soong_zip", "aapt2", "merge_zips", "zipalign", "debugfs_static", 184 "signapk.jar", "android.jar", "blkid", "fsck.erofs", "conv_apex_manifest"]: 185 file_path = os.path.join(dir_name, "bin", i) 186 if os.path.exists(file_path): 187 os.chmod(file_path, stat.S_IRUSR | stat.S_IXUSR); 188 files[i] = file_path 189 else: 190 files[i] = i 191 self.host_tools = files 192 self.host_tools_path = os.path.join(dir_name, "bin") 193 194 path = self.host_tools_path 195 if "PATH" in os.environ: 196 path += ":" + os.environ["PATH"] 197 os.environ["PATH"] = path 198 199 ld_library_path = os.path.join(dir_name, "lib64") 200 if "LD_LIBRARY_PATH" in os.environ: 201 ld_library_path += ":" + os.environ["LD_LIBRARY_PATH"] 202 if "ANDROID_HOST_OUT" in os.environ: 203 ld_library_path += ":" + os.path.join(os.environ["ANDROID_HOST_OUT"], "lib64") 204 os.environ["LD_LIBRARY_PATH"] = ld_library_path 205 206 def _extract_resource(self, resource_name): 207 with ( 208 resources.files("apexer_test").joinpath(resource_name).open('rb') as f, 209 tempfile.NamedTemporaryFile(prefix=resource_name.replace('/', '_'), delete=False) as f2, 210 ): 211 self._to_cleanup.append(f2.name) 212 shutil.copyfileobj(f, f2) 213 return f2.name 214 215 def _get_container_files(self, apex_file_path): 216 dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_container_files_") 217 self._to_cleanup.append(dir_name) 218 with ZipFile(apex_file_path, 'r') as zip_obj: 219 zip_obj.extractall(path=dir_name) 220 files = {} 221 for i in ["apex_manifest.json", "apex_manifest.pb", 222 "apex_build_info.pb", "assets", 223 "apex_payload.img", "apex_payload.zip"]: 224 file_path = os.path.join(dir_name, i) 225 if os.path.exists(file_path): 226 files[i] = file_path 227 self.assertIn("apex_manifest.pb", files) 228 self.assertIn("apex_build_info.pb", files) 229 230 image_file = None 231 if "apex_payload.img" in files: 232 image_file = files["apex_payload.img"] 233 elif "apex_payload.zip" in files: 234 image_file = files["apex_payload.zip"] 235 self.assertIsNotNone(image_file) 236 files["apex_payload"] = image_file 237 238 return files 239 240 def _extract_payload_from_img(self, img_file_path): 241 dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_") 242 self._to_cleanup.append(dir_name) 243 cmd = ["debugfs_static", '-R', 'rdump ./ %s' % dir_name, img_file_path] 244 run_host_command(cmd) 245 246 # Remove payload files added by apexer and e2fs tools. 247 for i in ["apex_manifest.json", "apex_manifest.pb"]: 248 if os.path.exists(os.path.join(dir_name, i)): 249 os.remove(os.path.join(dir_name, i)) 250 if os.path.isdir(os.path.join(dir_name, "lost+found")): 251 shutil.rmtree(os.path.join(dir_name, "lost+found")) 252 return dir_name 253 254 def _extract_payload(self, apex_file_path): 255 dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_") 256 self._to_cleanup.append(dir_name) 257 cmd = ["deapexer", "--debugfs_path", self.host_tools["debugfs_static"], 258 "--blkid_path",self.host_tools["blkid"], "--fsckerofs_path", 259 self.host_tools["fsck.erofs"], "extract", apex_file_path, dir_name] 260 run_host_command(cmd) 261 262 # Remove payload files added by apexer and e2fs tools. 263 for i in ["apex_manifest.json", "apex_manifest.pb"]: 264 if os.path.exists(os.path.join(dir_name, i)): 265 os.remove(os.path.join(dir_name, i)) 266 if os.path.isdir(os.path.join(dir_name, "lost+found")): 267 shutil.rmtree(os.path.join(dir_name, "lost+found")) 268 return dir_name 269 270 def _run_apexer(self, container_files, payload_dir, args=[]): 271 unsigned_payload_only = False 272 payload_only = False 273 if "--unsigned_payload_only" in args: 274 unsigned_payload_only = True 275 if unsigned_payload_only or "--payload_only" in args: 276 payload_only = True 277 278 os.environ["APEXER_TOOL_PATH"] = (self.host_tools_path + 279 ":out/host/linux-x86/bin:prebuilts/sdk/tools/linux/bin") 280 cmd = ["apexer", "--force", "--include_build_info", "--do_not_check_keyname"] 281 if DEBUG_TEST: 282 cmd.append('-v') 283 cmd.extend(["--apexer_tool_path", os.environ["APEXER_TOOL_PATH"]]) 284 cmd.extend(["--android_jar_path", self.host_tools["android.jar"]]) 285 cmd.extend(["--manifest", container_files["apex_manifest.pb"]]) 286 if "apex_manifest.json" in container_files: 287 cmd.extend(["--manifest_json", container_files["apex_manifest.json"]]) 288 cmd.extend(["--build_info", container_files["apex_build_info.pb"]]) 289 if not payload_only and "assets" in container_files: 290 cmd.extend(["--assets_dir", container_files["assets"]]) 291 if not unsigned_payload_only: 292 cmd.extend(["--key", self._extract_resource(TEST_PRIVATE_KEY)]) 293 cmd.extend(["--pubkey", self._extract_resource(TEST_AVB_PUBLIC_KEY)]) 294 cmd.extend(args) 295 296 # Decide on output file name 297 apex_suffix = ".apex.unsigned" 298 if payload_only: 299 apex_suffix = ".payload" 300 fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=apex_suffix) 301 os.close(fd) 302 self._to_cleanup.append(fn) 303 cmd.extend([payload_dir, fn]) 304 305 run_host_command(cmd) 306 return fn 307 308 def _get_java_toolchain(self): 309 java_toolchain = "java" 310 if os.path.isfile("prebuilts/jdk/jdk17/linux-x86/bin/java"): 311 java_toolchain = "prebuilts/jdk/jdk17/linux-x86/bin/java" 312 elif os.path.isfile("/jdk/jdk17/linux-x86/bin/java"): 313 java_toolchain = "/jdk/jdk17/linux-x86/bin/java" 314 elif "ANDROID_JAVA_TOOLCHAIN" in os.environ: 315 java_toolchain = os.path.join(os.environ["ANDROID_JAVA_TOOLCHAIN"], "java") 316 elif "ANDROID_JAVA_HOME" in os.environ: 317 java_toolchain = os.path.join(os.environ["ANDROID_JAVA_HOME"], "bin", "java") 318 elif "JAVA_HOME" in os.environ: 319 java_toolchain = os.path.join(os.environ["JAVA_HOME"], "bin", "java") 320 321 java_dep_lib = os.environ["LD_LIBRARY_PATH"] 322 if "ANDROID_HOST_OUT" in os.environ: 323 java_dep_lib += ":" + os.path.join(os.environ["ANDROID_HOST_OUT"], "lib64") 324 if "ANDROID_BUILD_TOP" in os.environ: 325 java_dep_lib += ":" + os.path.join(os.environ["ANDROID_BUILD_TOP"], 326 "out/host/linux-x86/lib64") 327 328 return [java_toolchain, java_dep_lib] 329 330 def _sign_apk_container(self, unsigned_apex): 331 fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".apex") 332 os.close(fd) 333 self._to_cleanup.append(fn) 334 java_toolchain, java_dep_lib = self._get_java_toolchain() 335 cmd = [ 336 java_toolchain, 337 "-Djava.library.path=" + java_dep_lib, 338 "-jar", self.host_tools['signapk.jar'], 339 "-a", "4096", "--align-file-size", 340 self._extract_resource(TEST_X509_KEY), 341 self._extract_resource(TEST_PK8_KEY), 342 unsigned_apex, fn] 343 run_and_check_output(cmd) 344 return fn 345 346 def _sign_payload(self, container_files, unsigned_payload): 347 fd, signed_payload = \ 348 tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".payload") 349 os.close(fd) 350 self._to_cleanup.append(signed_payload) 351 shutil.copyfile(unsigned_payload, signed_payload) 352 353 cmd = ['avbtool'] 354 cmd.append('add_hashtree_footer') 355 cmd.append('--do_not_generate_fec') 356 cmd.extend(['--algorithm', 'SHA256_RSA4096']) 357 cmd.extend(['--hash_algorithm', 'sha256']) 358 cmd.extend(['--key', self._extract_resource(TEST_PRIVATE_KEY)]) 359 manifest_apex = ParseApexManifest(container_files["apex_manifest.pb"]) 360 ValidateApexManifest(manifest_apex) 361 cmd.extend(['--prop', 'apex.key:' + manifest_apex.name]) 362 # Set up the salt based on manifest content which includes name 363 # and version 364 salt = hashlib.sha256(manifest_apex.SerializeToString()).hexdigest() 365 cmd.extend(['--salt', salt]) 366 cmd.extend(['--image', signed_payload]) 367 cmd.append('--no_hashtree') 368 run_and_check_output(cmd) 369 370 return signed_payload 371 372 def _verify_payload(self, payload): 373 """Verifies that the payload is properly signed by avbtool""" 374 cmd = ["avbtool", "verify_image", "--image", payload, "--accept_zeroed_hashtree"] 375 run_and_check_output(cmd) 376 377 def _run_build_test(self, apex_name): 378 apex_file_path = self._extract_resource(apex_name + ".apex") 379 if DEBUG_TEST: 380 fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_input_", suffix=".apex") 381 os.close(fd) 382 shutil.copyfile(apex_file_path, fn) 383 self._to_cleanup.append(fn) 384 container_files = self._get_container_files(apex_file_path) 385 payload_dir = self._extract_payload(apex_file_path) 386 repack_apex_file_path = self._run_apexer(container_files, payload_dir) 387 resigned_apex_file_path = self._sign_apk_container(repack_apex_file_path) 388 self.assertEqual(get_sha1sum(apex_file_path), get_sha1sum(resigned_apex_file_path)) 389 390 def test_simple_apex(self): 391 self._run_build_test(TEST_APEX) 392 393 def test_legacy_apex(self): 394 self._run_build_test(TEST_APEX_LEGACY) 395 396 def test_output_payload_only(self): 397 """Assert that payload-only output from apexer is same as the payload we get by unzipping 398 apex. 399 """ 400 apex_file_path = self._extract_resource(TEST_APEX + ".apex") 401 container_files = self._get_container_files(apex_file_path) 402 payload_dir = self._extract_payload(apex_file_path) 403 payload_only_file_path = self._run_apexer(container_files, payload_dir, ["--payload_only"]) 404 self._verify_payload(payload_only_file_path) 405 self.assertEqual(get_sha1sum(payload_only_file_path), 406 get_sha1sum(container_files["apex_payload"])) 407 408 def test_output_unsigned_payload_only(self): 409 """Assert that when unsigned-payload-only output from apexer is signed by the avb key, it is 410 same as the payload we get by unzipping apex. 411 """ 412 apex_file_path = self._extract_resource(TEST_APEX + ".apex") 413 container_files = self._get_container_files(apex_file_path) 414 payload_dir = self._extract_payload(apex_file_path) 415 unsigned_payload_only_file_path = self._run_apexer(container_files, payload_dir, 416 ["--unsigned_payload_only"]) 417 with self.assertRaises(RuntimeError) as error: 418 self._verify_payload(unsigned_payload_only_file_path) 419 self.assertIn("Given image does not look like a vbmeta image", str(error.exception)) 420 signed_payload = self._sign_payload(container_files, unsigned_payload_only_file_path) 421 self.assertEqual(get_sha1sum(signed_payload), 422 get_sha1sum(container_files["apex_payload"])) 423 424 # Now assert that given an unsigned image and the original container 425 # files, we can produce an identical unsigned image. 426 unsigned_payload_dir = self._extract_payload_from_img(unsigned_payload_only_file_path) 427 unsigned_payload_only_2_file_path = self._run_apexer(container_files, unsigned_payload_dir, 428 ["--unsigned_payload_only"]) 429 self.assertEqual(get_sha1sum(unsigned_payload_only_file_path), 430 get_sha1sum(unsigned_payload_only_2_file_path)) 431 432 def test_apex_with_logging_parent(self): 433 self._run_build_test(TEST_APEX_WITH_LOGGING_PARENT) 434 435 def test_apex_with_overridden_package_name(self): 436 self._run_build_test(TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME) 437 438 def test_conv_apex_manifest(self): 439 # .pb generation from json 440 manifest_json_path = self._extract_resource(TEST_MANIFEST_JSON) 441 442 fd, fn = tempfile.mkstemp(prefix=self._testMethodName + "_manifest_", suffix=".pb") 443 os.close(fd) 444 self._to_cleanup.append(fn) 445 cmd = [ 446 "conv_apex_manifest", 447 "proto", 448 manifest_json_path, 449 "-o", fn] 450 run_and_check_output(cmd) 451 452 with open(manifest_json_path) as fd_json: 453 manifest_json = json.load(fd_json) 454 manifest_apex = ParseApexManifest(fn) 455 ValidateApexManifest(manifest_apex) 456 457 self.assertEqual(manifest_apex.name, manifest_json["name"]) 458 self.assertEqual(manifest_apex.version, manifest_json["version"]) 459 460 # setprop check on already generated .pb 461 next_version = 20 462 cmd = [ 463 "conv_apex_manifest", 464 "setprop", 465 "version", str(next_version), 466 fn] 467 run_and_check_output(cmd) 468 469 manifest_apex = ParseApexManifest(fn) 470 ValidateApexManifest(manifest_apex) 471 472 self.assertEqual(manifest_apex.version, next_version) 473 474 475 476if __name__ == '__main__': 477 unittest.main(verbosity=2) 478