1#!/usr/bin/env python3 2# 3# Copyright (C) 2020 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Unit tests for apexer_with_DCLA_preprocessing.""" 18import hashlib 19import importlib.resources 20import os 21import shutil 22import stat 23import subprocess 24import tempfile 25from typing import List, BinaryIO 26import unittest 27import zipfile 28 29TEST_PRIVATE_KEY = os.path.join('testdata', 'com.android.example.apex.pem') 30TEST_APEX = 'com.android.example.apex' 31 32# In order to debug test failures, set DEBUG_TEST to True and run the test from 33# local workstation bypassing atest, e.g.: 34# $ m apexer_with_DCLA_preprocessing_test && \ 35# out/host/linux-x86/nativetest64/apexer_with_DCLA_preprocessing_test/\ 36# apexer_with_DCLA_preprocessing_test 37# 38# the test will print out the command used, and the temporary files used by the 39# test. 40DEBUG_TEST = False 41 42def resources(): 43 return importlib.resources.files('apexer_with_DCLA_preprocessing_test') 44 45# TODO: consolidate these common test utilities into a common python_library_host 46# to be shared across tests under system/apex 47def run_command(cmd: List[str]) -> None: 48 """Run a command.""" 49 try: 50 if DEBUG_TEST: 51 cmd_str = ' '.join(cmd) 52 print(f'\nRunning: \n{cmd_str}\n') 53 subprocess.run( 54 cmd, 55 check=True, 56 text=True, 57 stdout=subprocess.PIPE, 58 stderr=subprocess.PIPE) 59 except subprocess.CalledProcessError as err: 60 print(err.stderr) 61 print(err.output) 62 raise err 63 64def get_digest(file_path: str) -> str: 65 """Get sha512 digest of a file """ 66 digester = hashlib.sha512() 67 with open(file_path, 'rb') as f: 68 bytes_to_digest = f.read() 69 digester.update(bytes_to_digest) 70 return digester.hexdigest() 71 72class ApexerWithDCLAPreprocessingTest(unittest.TestCase): 73 74 def setUp(self): 75 self._to_cleanup = [] 76 self.unzip_host_tools() 77 78 def tearDown(self): 79 if not DEBUG_TEST: 80 for i in self._to_cleanup: 81 if os.path.isdir(i): 82 shutil.rmtree(i, ignore_errors=True) 83 else: 84 os.remove(i) 85 del self._to_cleanup[:] 86 else: 87 print('Cleanup: ' + str(self._to_cleanup)) 88 89 def create_temp_dir(self) -> str: 90 tmp_dir = tempfile.mkdtemp() 91 self._to_cleanup.append(tmp_dir) 92 return tmp_dir 93 94 def expand_apex(self, apex_file: str | BinaryIO) -> None: 95 """expand an apex file include apex_payload.""" 96 apex_dir = self.create_temp_dir() 97 with zipfile.ZipFile(apex_file, 'r') as apex_zip: 98 apex_zip.extractall(apex_dir) 99 payload_img = os.path.join(apex_dir, 'apex_payload.img') 100 extract_dir = os.path.join(apex_dir, 'payload_extract') 101 os.mkdir(extract_dir) 102 run_command([self.debugfs_static, payload_img, '-R', f'rdump / {extract_dir}']) 103 104 # remove /etc and /lost+found and /payload_extract/apex_manifest.pb 105 lost_and_found = os.path.join(extract_dir, 'lost+found') 106 etc_dir = os.path.join(extract_dir, 'etc') 107 os.remove(os.path.join(extract_dir, 'apex_manifest.pb')) 108 if os.path.isdir(lost_and_found): 109 shutil.rmtree(lost_and_found) 110 if os.path.isdir(etc_dir): 111 shutil.rmtree(etc_dir) 112 113 return apex_dir 114 115 def unzip_host_tools(self) -> None: 116 host_tools_dir = self.create_temp_dir() 117 with ( 118 resources().joinpath('apexer_test_host_tools.zip').open(mode='rb') as host_tools_zip_resource, 119 resources().joinpath(TEST_PRIVATE_KEY).open(mode='rb') as key_file_resource, 120 resources().joinpath('apexer_with_DCLA_preprocessing').open(mode='rb') as apexer_wrapper_resource, 121 ): 122 with zipfile.ZipFile(host_tools_zip_resource, 'r') as zip_obj: 123 zip_obj.extractall(host_tools_dir) 124 apexer_wrapper = os.path.join(host_tools_dir, 'apexer_with_DCLA_preprocessing') 125 with open(apexer_wrapper, 'wb') as f: 126 shutil.copyfileobj(apexer_wrapper_resource, f) 127 key_file = os.path.join(host_tools_dir, 'key.pem') 128 with open(key_file, 'wb') as f: 129 shutil.copyfileobj(key_file_resource, f) 130 131 132 self.apexer_tool_path = os.path.join(host_tools_dir, 'bin') 133 self.apexer_wrapper = apexer_wrapper 134 self.key_file = key_file 135 self.debugfs_static = os.path.join(host_tools_dir, 'bin/debugfs_static') 136 self.android_jar = os.path.join(host_tools_dir, 'bin/android.jar') 137 self.apexer = os.path.join(host_tools_dir, 'bin/apexer') 138 os.chmod(apexer_wrapper, stat.S_IRUSR | stat.S_IXUSR); 139 for i in ['apexer', 'deapexer', 'avbtool', 'mke2fs', 'sefcontext_compile', 'e2fsdroid', 140 'resize2fs', 'soong_zip', 'aapt2', 'merge_zips', 'zipalign', 'debugfs_static', 141 'signapk.jar', 'android.jar']: 142 file_path = os.path.join(host_tools_dir, 'bin', i) 143 if os.path.exists(file_path): 144 os.chmod(file_path, stat.S_IRUSR | stat.S_IXUSR); 145 146 147 def test_DCLA_preprocessing(self): 148 """test DCLA preprocessing done properly.""" 149 with resources().joinpath(TEST_APEX + '.apex').open(mode='rb') as apex_file: 150 apex_dir = self.expand_apex(apex_file) 151 152 # create apex canned_fs_config file, TEST_APEX does not come with one 153 canned_fs_config_file = os.path.join(apex_dir, 'canned_fs_config') 154 with open(canned_fs_config_file, 'w') as f: 155 # add /lib/foo.so file 156 lib_dir = os.path.join(apex_dir, 'payload_extract', 'lib') 157 os.makedirs(lib_dir) 158 foo_file = os.path.join(lib_dir, 'foo.so') 159 with open(foo_file, 'w') as lib_file: 160 lib_file.write('This is a placeholder lib file.') 161 foo_digest = get_digest(foo_file) 162 163 # add /lib dir and /lib/foo.so in canned_fs_config 164 f.write('/lib 0 2000 0755\n') 165 f.write('/lib/foo.so 1000 1000 0644\n') 166 167 # add /lib/bar.so file 168 lib_dir = os.path.join(apex_dir, 'payload_extract', 'lib64') 169 os.makedirs(lib_dir) 170 bar_file = os.path.join(lib_dir, 'bar.so') 171 with open(bar_file, 'w') as lib_file: 172 lib_file.write('This is another placeholder lib file.') 173 bar_digest = get_digest(bar_file) 174 175 # add /lib dir and /lib/foo.so in canned_fs_config 176 f.write('/lib64 0 2000 0755\n') 177 f.write('/lib64/bar.so 1000 1000 0644\n') 178 179 f.write('/ 0 2000 0755\n') 180 f.write('/apex_manifest.pb 1000 1000 0644\n') 181 182 # call apexer_with_DCLA_preprocessing 183 manifest_file = os.path.join(apex_dir, 'apex_manifest.pb') 184 build_info_file = os.path.join(apex_dir, 'apex_build_info.pb') 185 apex_out = os.path.join(apex_dir, 'DCLA_preprocessed_output.apex') 186 run_command([self.apexer_wrapper, 187 '--apexer', self.apexer, 188 '--canned_fs_config', canned_fs_config_file, 189 os.path.join(apex_dir, 'payload_extract'), 190 apex_out, 191 '--', 192 '--android_jar_path', self.android_jar, 193 '--apexer_tool_path', self.apexer_tool_path, 194 '--key', self.key_file, 195 '--manifest', manifest_file, 196 '--build_info', build_info_file, 197 '--payload_fs_type', 'ext4', 198 '--payload_type', 'image', 199 '--force' 200 ]) 201 202 # check the existence of updated canned_fs_config 203 updated_canned_fs_config = os.path.join(apex_dir, 'updated_canned_fs_config') 204 self.assertTrue( 205 os.path.isfile(updated_canned_fs_config), 206 'missing updated canned_fs_config file named updated_canned_fs_config') 207 208 # check the resulting apex, it should have /lib/foo.so/<hash>/foo.so and 209 # /lib64/bar.so/<hash>/bar.so 210 result_apex_dir = self.expand_apex(apex_out) 211 replaced_foo = os.path.join( 212 result_apex_dir, f'payload_extract/lib/foo.so/{foo_digest}/foo.so') 213 replaced_bar = os.path.join( 214 result_apex_dir, f'payload_extract/lib64/bar.so/{bar_digest}/bar.so') 215 self.assertTrue( 216 os.path.isfile(replaced_foo), 217 f'expecting /lib/foo.so/{foo_digest}/foo.so') 218 self.assertTrue( 219 os.path.isfile(replaced_bar), 220 f'expecting /lib64/bar.so/{bar_digest}/bar.so') 221 222if __name__ == '__main__': 223 unittest.main(verbosity=2) 224