1#!/usr/bin/env python3
2#
3# Copyright (C) 2020 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Unit tests for apexer_with_DCLA_preprocessing."""
18import hashlib
19import importlib.resources
20import os
21import shutil
22import stat
23import subprocess
24import tempfile
25from typing import List, BinaryIO
26import unittest
27import zipfile
28
29TEST_PRIVATE_KEY = os.path.join('testdata', 'com.android.example.apex.pem')
30TEST_APEX = 'com.android.example.apex'
31
32# In order to debug test failures, set DEBUG_TEST to True and run the test from
33# local workstation bypassing atest, e.g.:
34# $ m apexer_with_DCLA_preprocessing_test && \
35#   out/host/linux-x86/nativetest64/apexer_with_DCLA_preprocessing_test/\
36#   apexer_with_DCLA_preprocessing_test
37#
38# the test will print out the command used, and the temporary files used by the
39# test.
40DEBUG_TEST = False
41
42def resources():
43  return importlib.resources.files('apexer_with_DCLA_preprocessing_test')
44
45# TODO: consolidate these common test utilities into a common python_library_host
46# to be shared across tests under system/apex
47def run_command(cmd: List[str]) -> None:
48  """Run a command."""
49  try:
50    if DEBUG_TEST:
51      cmd_str = ' '.join(cmd)
52      print(f'\nRunning: \n{cmd_str}\n')
53    subprocess.run(
54        cmd,
55        check=True,
56        text=True,
57        stdout=subprocess.PIPE,
58        stderr=subprocess.PIPE)
59  except subprocess.CalledProcessError as err:
60    print(err.stderr)
61    print(err.output)
62    raise err
63
64def get_digest(file_path: str) -> str:
65  """Get sha512 digest of a file """
66  digester = hashlib.sha512()
67  with open(file_path, 'rb') as f:
68    bytes_to_digest = f.read()
69    digester.update(bytes_to_digest)
70    return digester.hexdigest()
71
72class ApexerWithDCLAPreprocessingTest(unittest.TestCase):
73
74  def setUp(self):
75    self._to_cleanup = []
76    self.unzip_host_tools()
77
78  def tearDown(self):
79    if not DEBUG_TEST:
80      for i in self._to_cleanup:
81        if os.path.isdir(i):
82          shutil.rmtree(i, ignore_errors=True)
83        else:
84          os.remove(i)
85      del self._to_cleanup[:]
86    else:
87      print('Cleanup: ' + str(self._to_cleanup))
88
89  def create_temp_dir(self) -> str:
90    tmp_dir = tempfile.mkdtemp()
91    self._to_cleanup.append(tmp_dir)
92    return tmp_dir
93
94  def expand_apex(self, apex_file: str | BinaryIO) -> None:
95    """expand an apex file include apex_payload."""
96    apex_dir = self.create_temp_dir()
97    with zipfile.ZipFile(apex_file, 'r') as apex_zip:
98      apex_zip.extractall(apex_dir)
99    payload_img = os.path.join(apex_dir, 'apex_payload.img')
100    extract_dir = os.path.join(apex_dir, 'payload_extract')
101    os.mkdir(extract_dir)
102    run_command([self.debugfs_static, payload_img, '-R', f'rdump / {extract_dir}'])
103
104    # remove /etc and /lost+found and /payload_extract/apex_manifest.pb
105    lost_and_found = os.path.join(extract_dir, 'lost+found')
106    etc_dir = os.path.join(extract_dir, 'etc')
107    os.remove(os.path.join(extract_dir, 'apex_manifest.pb'))
108    if os.path.isdir(lost_and_found):
109      shutil.rmtree(lost_and_found)
110    if os.path.isdir(etc_dir):
111      shutil.rmtree(etc_dir)
112
113    return apex_dir
114
115  def unzip_host_tools(self) -> None:
116    host_tools_dir = self.create_temp_dir()
117    with (
118      resources().joinpath('apexer_test_host_tools.zip').open(mode='rb') as host_tools_zip_resource,
119      resources().joinpath(TEST_PRIVATE_KEY).open(mode='rb') as key_file_resource,
120      resources().joinpath('apexer_with_DCLA_preprocessing').open(mode='rb') as apexer_wrapper_resource,
121    ):
122      with zipfile.ZipFile(host_tools_zip_resource, 'r') as zip_obj:
123        zip_obj.extractall(host_tools_dir)
124      apexer_wrapper = os.path.join(host_tools_dir, 'apexer_with_DCLA_preprocessing')
125      with open(apexer_wrapper, 'wb') as f:
126        shutil.copyfileobj(apexer_wrapper_resource, f)
127      key_file = os.path.join(host_tools_dir, 'key.pem')
128      with open(key_file, 'wb') as f:
129        shutil.copyfileobj(key_file_resource, f)
130
131
132    self.apexer_tool_path = os.path.join(host_tools_dir, 'bin')
133    self.apexer_wrapper = apexer_wrapper
134    self.key_file = key_file
135    self.debugfs_static = os.path.join(host_tools_dir, 'bin/debugfs_static')
136    self.android_jar = os.path.join(host_tools_dir, 'bin/android.jar')
137    self.apexer = os.path.join(host_tools_dir, 'bin/apexer')
138    os.chmod(apexer_wrapper, stat.S_IRUSR | stat.S_IXUSR);
139    for i in ['apexer', 'deapexer', 'avbtool', 'mke2fs', 'sefcontext_compile', 'e2fsdroid',
140      'resize2fs', 'soong_zip', 'aapt2', 'merge_zips', 'zipalign', 'debugfs_static',
141      'signapk.jar', 'android.jar']:
142      file_path = os.path.join(host_tools_dir, 'bin', i)
143      if os.path.exists(file_path):
144        os.chmod(file_path, stat.S_IRUSR | stat.S_IXUSR);
145
146
147  def test_DCLA_preprocessing(self):
148    """test DCLA preprocessing done properly."""
149    with resources().joinpath(TEST_APEX + '.apex').open(mode='rb') as apex_file:
150      apex_dir = self.expand_apex(apex_file)
151
152    # create apex canned_fs_config file, TEST_APEX does not come with one
153    canned_fs_config_file = os.path.join(apex_dir, 'canned_fs_config')
154    with open(canned_fs_config_file, 'w') as f:
155      # add /lib/foo.so file
156      lib_dir = os.path.join(apex_dir, 'payload_extract', 'lib')
157      os.makedirs(lib_dir)
158      foo_file = os.path.join(lib_dir, 'foo.so')
159      with open(foo_file, 'w') as lib_file:
160        lib_file.write('This is a placeholder lib file.')
161      foo_digest = get_digest(foo_file)
162
163      # add /lib dir and /lib/foo.so in canned_fs_config
164      f.write('/lib 0 2000 0755\n')
165      f.write('/lib/foo.so 1000 1000 0644\n')
166
167      # add /lib/bar.so file
168      lib_dir = os.path.join(apex_dir, 'payload_extract', 'lib64')
169      os.makedirs(lib_dir)
170      bar_file = os.path.join(lib_dir, 'bar.so')
171      with open(bar_file, 'w') as lib_file:
172        lib_file.write('This is another placeholder lib file.')
173      bar_digest = get_digest(bar_file)
174
175      # add /lib dir and /lib/foo.so in canned_fs_config
176      f.write('/lib64 0 2000 0755\n')
177      f.write('/lib64/bar.so 1000 1000 0644\n')
178
179      f.write('/ 0 2000 0755\n')
180      f.write('/apex_manifest.pb 1000 1000 0644\n')
181
182    # call apexer_with_DCLA_preprocessing
183    manifest_file = os.path.join(apex_dir, 'apex_manifest.pb')
184    build_info_file = os.path.join(apex_dir, 'apex_build_info.pb')
185    apex_out = os.path.join(apex_dir, 'DCLA_preprocessed_output.apex')
186    run_command([self.apexer_wrapper,
187                 '--apexer', self.apexer,
188                 '--canned_fs_config', canned_fs_config_file,
189                 os.path.join(apex_dir, 'payload_extract'),
190                 apex_out,
191                 '--',
192                 '--android_jar_path', self.android_jar,
193                 '--apexer_tool_path', self.apexer_tool_path,
194                 '--key', self.key_file,
195                 '--manifest', manifest_file,
196                 '--build_info', build_info_file,
197                 '--payload_fs_type', 'ext4',
198                 '--payload_type', 'image',
199                 '--force'
200                 ])
201
202    # check the existence of updated canned_fs_config
203    updated_canned_fs_config = os.path.join(apex_dir, 'updated_canned_fs_config')
204    self.assertTrue(
205        os.path.isfile(updated_canned_fs_config),
206        'missing updated canned_fs_config file named updated_canned_fs_config')
207
208    # check the resulting apex, it should have /lib/foo.so/<hash>/foo.so and
209    # /lib64/bar.so/<hash>/bar.so
210    result_apex_dir = self.expand_apex(apex_out)
211    replaced_foo = os.path.join(
212        result_apex_dir, f'payload_extract/lib/foo.so/{foo_digest}/foo.so')
213    replaced_bar = os.path.join(
214        result_apex_dir, f'payload_extract/lib64/bar.so/{bar_digest}/bar.so')
215    self.assertTrue(
216        os.path.isfile(replaced_foo),
217        f'expecting /lib/foo.so/{foo_digest}/foo.so')
218    self.assertTrue(
219        os.path.isfile(replaced_bar),
220        f'expecting /lib64/bar.so/{bar_digest}/bar.so')
221
222if __name__ == '__main__':
223  unittest.main(verbosity=2)
224