1#!/usr/bin/env python
2#
3# Copyright (C) 2020 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17"""Unit tests for apexer."""
18
19import hashlib
20import json
21import logging
22import os
23import shutil
24import stat
25import subprocess
26import tempfile
27import unittest
28from importlib import resources
29from zipfile import ZipFile
30
31from apex_manifest import ValidateApexManifest
32from apex_manifest import ParseApexManifest
33
34logger = logging.getLogger(__name__)
35
36TEST_APEX = "com.android.example.apex"
37TEST_APEX_LEGACY = "com.android.example-legacy.apex"
38TEST_APEX_WITH_LOGGING_PARENT = "com.android.example-logging_parent.apex"
39TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME = "com.android.example-overridden_package_name.apex"
40
41TEST_PRIVATE_KEY = os.path.join("testdata", "com.android.example.apex.pem")
42TEST_X509_KEY = os.path.join("testdata", "com.android.example.apex.x509.pem")
43TEST_PK8_KEY = os.path.join("testdata", "com.android.example.apex.pk8")
44TEST_AVB_PUBLIC_KEY = os.path.join("testdata", "com.android.example.apex.avbpubkey")
45TEST_MANIFEST_JSON = os.path.join("testdata", "manifest.json")
46
47def run(args, verbose=None, **kwargs):
48    """Creates and returns a subprocess.Popen object.
49
50    Args:
51      args: The command represented as a list of strings.
52      verbose: Whether the commands should be shown. Default to the global
53          verbosity if unspecified.
54      kwargs: Any additional args to be passed to subprocess.Popen(), such as env,
55          stdin, etc. stdout and stderr will default to subprocess.PIPE and
56          subprocess.STDOUT respectively unless caller specifies any of them.
57          universal_newlines will default to True, as most of the users in
58          releasetools expect string output.
59
60    Returns:
61      A subprocess.Popen object.
62    """
63    if 'stdout' not in kwargs and 'stderr' not in kwargs:
64        kwargs['stdout'] = subprocess.PIPE
65        kwargs['stderr'] = subprocess.STDOUT
66    if 'universal_newlines' not in kwargs:
67        kwargs['universal_newlines'] = True
68    # Don't log any if caller explicitly says so.
69    if DEBUG_TEST:
70        print("\nRunning: \n%s\n" % " ".join(args))
71    if verbose:
72        logger.info("  Running: \"%s\"", " ".join(args))
73    return subprocess.Popen(args, **kwargs)
74
75
76def run_host_command(args, verbose=None, **kwargs):
77    host_build_top = os.environ.get("ANDROID_BUILD_TOP")
78    if host_build_top:
79        host_command_dir = os.path.join(host_build_top, "out/host/linux-x86/bin")
80        args[0] = os.path.join(host_command_dir, args[0])
81    return run_and_check_output(args, verbose, **kwargs)
82
83
84def run_and_check_output(args, verbose=None, **kwargs):
85    """Runs the given command and returns the output.
86
87    Args:
88      args: The command represented as a list of strings.
89      verbose: Whether the commands should be shown. Default to the global
90          verbosity if unspecified.
91      kwargs: Any additional args to be passed to subprocess.Popen(), such as env,
92          stdin, etc. stdout and stderr will default to subprocess.PIPE and
93          subprocess.STDOUT respectively unless caller specifies any of them.
94
95    Returns:
96      The output string.
97
98    Raises:
99      ExternalError: On non-zero exit from the command.
100    """
101    proc = run(args, verbose=verbose, **kwargs)
102    output, _ = proc.communicate()
103    if output is None:
104        output = ""
105    # Don't log any if caller explicitly says so.
106    if verbose:
107        logger.info("%s", output.rstrip())
108    if proc.returncode != 0:
109        raise RuntimeError(
110            "Failed to run command '{}' (exit code {}):\n{}".format(
111                args, proc.returncode, output))
112    return output
113
114
115def get_sha1sum(file_path):
116    h = hashlib.sha256()
117
118    with open(file_path, 'rb') as file:
119        while True:
120            # Reading is buffered, so we can read smaller chunks.
121            chunk = file.read(h.block_size)
122            if not chunk:
123                break
124            h.update(chunk)
125
126    return h.hexdigest()
127
128
129def round_up(size, unit):
130    assert unit & (unit - 1) == 0
131    return (size + unit - 1) & (~(unit - 1))
132
133# In order to debug test failures, set DEBUG_TEST to True and run the test from
134# local workstation bypassing atest, e.g.:
135# $ m apexer_test && out/host/linux-x86/nativetest64/apexer_test/apexer_test
136#
137# the test will print out the command used, and the temporary files used by the
138# test. You need to compare e.g. /tmp/test_simple_apex_input_XXXXXXXX.apex with
139# /tmp/test_simple_apex_repacked_YYYYYYYY.apex to check where they are
140# different.
141# A simple script to analyze the differences:
142#
143# FILE_INPUT=/tmp/test_simple_apex_input_XXXXXXXX.apex
144# FILE_OUTPUT=/tmp/test_simple_apex_repacked_YYYYYYYY.apex
145#
146# cd ~/tmp/
147# rm -rf input output
148# mkdir input output
149# unzip ${FILE_INPUT} -d input/
150# unzip ${FILE_OUTPUT} -d output/
151#
152# diff -r input/ output/
153#
154# For analyzing binary diffs I had mild success using the vbindiff utility.
155DEBUG_TEST = False
156
157
158class ApexerRebuildTest(unittest.TestCase):
159    def setUp(self):
160        self._to_cleanup = []
161        self._get_host_tools()
162
163    def tearDown(self):
164        if not DEBUG_TEST:
165            for i in self._to_cleanup:
166                if os.path.isdir(i):
167                    shutil.rmtree(i, ignore_errors=True)
168                else:
169                    os.remove(i)
170            del self._to_cleanup[:]
171        else:
172            print(self._to_cleanup)
173
174    def _get_host_tools(self):
175        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_host_tools_")
176        self._to_cleanup.append(dir_name)
177        with resources.files("apexer_test").joinpath("apexer_test_host_tools.zip").open('rb') as f:
178            with ZipFile(f, 'r') as zip_obj:
179                zip_obj.extractall(path=dir_name)
180
181        files = {}
182        for i in ["apexer", "deapexer", "avbtool", "mke2fs", "sefcontext_compile", "e2fsdroid",
183            "resize2fs", "soong_zip", "aapt2", "merge_zips", "zipalign", "debugfs_static",
184                  "signapk.jar", "android.jar", "blkid", "fsck.erofs", "conv_apex_manifest"]:
185            file_path = os.path.join(dir_name, "bin", i)
186            if os.path.exists(file_path):
187                os.chmod(file_path, stat.S_IRUSR | stat.S_IXUSR);
188                files[i] = file_path
189            else:
190                files[i] = i
191        self.host_tools = files
192        self.host_tools_path = os.path.join(dir_name, "bin")
193
194        path = self.host_tools_path
195        if "PATH" in os.environ:
196            path += ":" + os.environ["PATH"]
197        os.environ["PATH"] = path
198
199        ld_library_path = os.path.join(dir_name, "lib64")
200        if "LD_LIBRARY_PATH" in os.environ:
201            ld_library_path += ":" + os.environ["LD_LIBRARY_PATH"]
202        if "ANDROID_HOST_OUT" in os.environ:
203            ld_library_path += ":" + os.path.join(os.environ["ANDROID_HOST_OUT"], "lib64")
204        os.environ["LD_LIBRARY_PATH"] = ld_library_path
205
206    def _extract_resource(self, resource_name):
207        with (
208            resources.files("apexer_test").joinpath(resource_name).open('rb') as f,
209            tempfile.NamedTemporaryFile(prefix=resource_name.replace('/', '_'), delete=False) as f2,
210        ):
211            self._to_cleanup.append(f2.name)
212            shutil.copyfileobj(f, f2)
213            return f2.name
214
215    def _get_container_files(self, apex_file_path):
216        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_container_files_")
217        self._to_cleanup.append(dir_name)
218        with ZipFile(apex_file_path, 'r') as zip_obj:
219            zip_obj.extractall(path=dir_name)
220        files = {}
221        for i in ["apex_manifest.json", "apex_manifest.pb",
222                  "apex_build_info.pb", "assets",
223                  "apex_payload.img", "apex_payload.zip"]:
224            file_path = os.path.join(dir_name, i)
225            if os.path.exists(file_path):
226                files[i] = file_path
227        self.assertIn("apex_manifest.pb", files)
228        self.assertIn("apex_build_info.pb", files)
229
230        image_file = None
231        if "apex_payload.img" in files:
232            image_file = files["apex_payload.img"]
233        elif "apex_payload.zip" in files:
234            image_file = files["apex_payload.zip"]
235        self.assertIsNotNone(image_file)
236        files["apex_payload"] = image_file
237
238        return files
239
240    def _extract_payload_from_img(self, img_file_path):
241        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_")
242        self._to_cleanup.append(dir_name)
243        cmd = ["debugfs_static", '-R', 'rdump ./ %s' % dir_name, img_file_path]
244        run_host_command(cmd)
245
246        # Remove payload files added by apexer and e2fs tools.
247        for i in ["apex_manifest.json", "apex_manifest.pb"]:
248            if os.path.exists(os.path.join(dir_name, i)):
249                os.remove(os.path.join(dir_name, i))
250        if os.path.isdir(os.path.join(dir_name, "lost+found")):
251            shutil.rmtree(os.path.join(dir_name, "lost+found"))
252        return dir_name
253
254    def _extract_payload(self, apex_file_path):
255        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_")
256        self._to_cleanup.append(dir_name)
257        cmd = ["deapexer", "--debugfs_path", self.host_tools["debugfs_static"],
258               "--blkid_path",self.host_tools["blkid"], "--fsckerofs_path",
259               self.host_tools["fsck.erofs"], "extract", apex_file_path, dir_name]
260        run_host_command(cmd)
261
262        # Remove payload files added by apexer and e2fs tools.
263        for i in ["apex_manifest.json", "apex_manifest.pb"]:
264            if os.path.exists(os.path.join(dir_name, i)):
265                os.remove(os.path.join(dir_name, i))
266        if os.path.isdir(os.path.join(dir_name, "lost+found")):
267            shutil.rmtree(os.path.join(dir_name, "lost+found"))
268        return dir_name
269
270    def _run_apexer(self, container_files, payload_dir, args=[]):
271        unsigned_payload_only = False
272        payload_only = False
273        if "--unsigned_payload_only" in args:
274            unsigned_payload_only = True
275        if unsigned_payload_only or "--payload_only" in args:
276            payload_only = True
277
278        os.environ["APEXER_TOOL_PATH"] = (self.host_tools_path +
279            ":out/host/linux-x86/bin:prebuilts/sdk/tools/linux/bin")
280        cmd = ["apexer", "--force", "--include_build_info", "--do_not_check_keyname"]
281        if DEBUG_TEST:
282            cmd.append('-v')
283        cmd.extend(["--apexer_tool_path", os.environ["APEXER_TOOL_PATH"]])
284        cmd.extend(["--android_jar_path", self.host_tools["android.jar"]])
285        cmd.extend(["--manifest", container_files["apex_manifest.pb"]])
286        if "apex_manifest.json" in container_files:
287            cmd.extend(["--manifest_json", container_files["apex_manifest.json"]])
288        cmd.extend(["--build_info", container_files["apex_build_info.pb"]])
289        if not payload_only and "assets" in container_files:
290            cmd.extend(["--assets_dir", container_files["assets"]])
291        if not unsigned_payload_only:
292            cmd.extend(["--key", self._extract_resource(TEST_PRIVATE_KEY)])
293            cmd.extend(["--pubkey", self._extract_resource(TEST_AVB_PUBLIC_KEY)])
294        cmd.extend(args)
295
296        # Decide on output file name
297        apex_suffix = ".apex.unsigned"
298        if payload_only:
299            apex_suffix = ".payload"
300        fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=apex_suffix)
301        os.close(fd)
302        self._to_cleanup.append(fn)
303        cmd.extend([payload_dir, fn])
304
305        run_host_command(cmd)
306        return fn
307
308    def _get_java_toolchain(self):
309        java_toolchain = "java"
310        if os.path.isfile("prebuilts/jdk/jdk17/linux-x86/bin/java"):
311            java_toolchain = "prebuilts/jdk/jdk17/linux-x86/bin/java"
312        elif os.path.isfile("/jdk/jdk17/linux-x86/bin/java"):
313            java_toolchain = "/jdk/jdk17/linux-x86/bin/java"
314        elif "ANDROID_JAVA_TOOLCHAIN" in os.environ:
315            java_toolchain = os.path.join(os.environ["ANDROID_JAVA_TOOLCHAIN"], "java")
316        elif "ANDROID_JAVA_HOME" in os.environ:
317            java_toolchain = os.path.join(os.environ["ANDROID_JAVA_HOME"], "bin", "java")
318        elif "JAVA_HOME" in os.environ:
319            java_toolchain = os.path.join(os.environ["JAVA_HOME"], "bin", "java")
320
321        java_dep_lib = os.environ["LD_LIBRARY_PATH"]
322        if "ANDROID_HOST_OUT" in os.environ:
323            java_dep_lib += ":" + os.path.join(os.environ["ANDROID_HOST_OUT"], "lib64")
324        if "ANDROID_BUILD_TOP" in os.environ:
325            java_dep_lib += ":" + os.path.join(os.environ["ANDROID_BUILD_TOP"],
326                "out/host/linux-x86/lib64")
327
328        return [java_toolchain, java_dep_lib]
329
330    def _sign_apk_container(self, unsigned_apex):
331        fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".apex")
332        os.close(fd)
333        self._to_cleanup.append(fn)
334        java_toolchain, java_dep_lib = self._get_java_toolchain()
335        cmd = [
336            java_toolchain,
337            "-Djava.library.path=" + java_dep_lib,
338            "-jar", self.host_tools['signapk.jar'],
339            "-a", "4096", "--align-file-size",
340            self._extract_resource(TEST_X509_KEY),
341            self._extract_resource(TEST_PK8_KEY),
342            unsigned_apex, fn]
343        run_and_check_output(cmd)
344        return fn
345
346    def _sign_payload(self, container_files, unsigned_payload):
347        fd, signed_payload = \
348            tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".payload")
349        os.close(fd)
350        self._to_cleanup.append(signed_payload)
351        shutil.copyfile(unsigned_payload, signed_payload)
352
353        cmd = ['avbtool']
354        cmd.append('add_hashtree_footer')
355        cmd.append('--do_not_generate_fec')
356        cmd.extend(['--algorithm', 'SHA256_RSA4096'])
357        cmd.extend(['--hash_algorithm', 'sha256'])
358        cmd.extend(['--key', self._extract_resource(TEST_PRIVATE_KEY)])
359        manifest_apex = ParseApexManifest(container_files["apex_manifest.pb"])
360        ValidateApexManifest(manifest_apex)
361        cmd.extend(['--prop', 'apex.key:' + manifest_apex.name])
362        # Set up the salt based on manifest content which includes name
363        # and version
364        salt = hashlib.sha256(manifest_apex.SerializeToString()).hexdigest()
365        cmd.extend(['--salt', salt])
366        cmd.extend(['--image', signed_payload])
367        cmd.append('--no_hashtree')
368        run_and_check_output(cmd)
369
370        return signed_payload
371
372    def _verify_payload(self, payload):
373        """Verifies that the payload is properly signed by avbtool"""
374        cmd = ["avbtool", "verify_image", "--image", payload, "--accept_zeroed_hashtree"]
375        run_and_check_output(cmd)
376
377    def _run_build_test(self, apex_name):
378        apex_file_path = self._extract_resource(apex_name + ".apex")
379        if DEBUG_TEST:
380            fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_input_", suffix=".apex")
381            os.close(fd)
382            shutil.copyfile(apex_file_path, fn)
383            self._to_cleanup.append(fn)
384        container_files = self._get_container_files(apex_file_path)
385        payload_dir = self._extract_payload(apex_file_path)
386        repack_apex_file_path = self._run_apexer(container_files, payload_dir)
387        resigned_apex_file_path = self._sign_apk_container(repack_apex_file_path)
388        self.assertEqual(get_sha1sum(apex_file_path), get_sha1sum(resigned_apex_file_path))
389
390    def test_simple_apex(self):
391        self._run_build_test(TEST_APEX)
392
393    def test_legacy_apex(self):
394        self._run_build_test(TEST_APEX_LEGACY)
395
396    def test_output_payload_only(self):
397        """Assert that payload-only output from apexer is same as the payload we get by unzipping
398        apex.
399        """
400        apex_file_path = self._extract_resource(TEST_APEX + ".apex")
401        container_files = self._get_container_files(apex_file_path)
402        payload_dir = self._extract_payload(apex_file_path)
403        payload_only_file_path = self._run_apexer(container_files, payload_dir, ["--payload_only"])
404        self._verify_payload(payload_only_file_path)
405        self.assertEqual(get_sha1sum(payload_only_file_path),
406                         get_sha1sum(container_files["apex_payload"]))
407
408    def test_output_unsigned_payload_only(self):
409        """Assert that when unsigned-payload-only output from apexer is signed by the avb key, it is
410        same as the payload we get by unzipping apex.
411        """
412        apex_file_path = self._extract_resource(TEST_APEX + ".apex")
413        container_files = self._get_container_files(apex_file_path)
414        payload_dir = self._extract_payload(apex_file_path)
415        unsigned_payload_only_file_path = self._run_apexer(container_files, payload_dir,
416                                                  ["--unsigned_payload_only"])
417        with self.assertRaises(RuntimeError) as error:
418            self._verify_payload(unsigned_payload_only_file_path)
419        self.assertIn("Given image does not look like a vbmeta image", str(error.exception))
420        signed_payload = self._sign_payload(container_files, unsigned_payload_only_file_path)
421        self.assertEqual(get_sha1sum(signed_payload),
422                         get_sha1sum(container_files["apex_payload"]))
423
424        # Now assert that given an unsigned image and the original container
425        # files, we can produce an identical unsigned image.
426        unsigned_payload_dir = self._extract_payload_from_img(unsigned_payload_only_file_path)
427        unsigned_payload_only_2_file_path = self._run_apexer(container_files, unsigned_payload_dir,
428                                                             ["--unsigned_payload_only"])
429        self.assertEqual(get_sha1sum(unsigned_payload_only_file_path),
430                         get_sha1sum(unsigned_payload_only_2_file_path))
431
432    def test_apex_with_logging_parent(self):
433      self._run_build_test(TEST_APEX_WITH_LOGGING_PARENT)
434
435    def test_apex_with_overridden_package_name(self):
436      self._run_build_test(TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME)
437
438    def test_conv_apex_manifest(self):
439        # .pb generation from json
440        manifest_json_path = self._extract_resource(TEST_MANIFEST_JSON)
441
442        fd, fn = tempfile.mkstemp(prefix=self._testMethodName + "_manifest_", suffix=".pb")
443        os.close(fd)
444        self._to_cleanup.append(fn)
445        cmd = [
446            "conv_apex_manifest",
447            "proto",
448            manifest_json_path,
449            "-o", fn]
450        run_and_check_output(cmd)
451
452        with open(manifest_json_path) as fd_json:
453            manifest_json = json.load(fd_json)
454        manifest_apex = ParseApexManifest(fn)
455        ValidateApexManifest(manifest_apex)
456
457        self.assertEqual(manifest_apex.name, manifest_json["name"])
458        self.assertEqual(manifest_apex.version, manifest_json["version"])
459
460        # setprop check on already generated .pb
461        next_version = 20
462        cmd = [
463            "conv_apex_manifest",
464            "setprop",
465            "version", str(next_version),
466            fn]
467        run_and_check_output(cmd)
468
469        manifest_apex = ParseApexManifest(fn)
470        ValidateApexManifest(manifest_apex)
471
472        self.assertEqual(manifest_apex.version, next_version)
473
474
475
476if __name__ == '__main__':
477    unittest.main(verbosity=2)
478