1#!/usr/bin/env python
2#
3# Copyright (C) 2018 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""
19Utils for running unittests.
20"""
21
22import avbtool
23import logging
24import os
25import os.path
26import re
27import struct
28import sys
29import unittest
30import zipfile
31
32import common
33
34# Some test runner doesn't like outputs from stderr.
35logging.basicConfig(stream=sys.stdout)
36
37ALLOWED_TEST_SUBDIRS = ('merge',)
38
39# Use ANDROID_BUILD_TOP as an indicator to tell if the needed tools (e.g.
40# avbtool, mke2fs) are available while running the tests, unless
41# FORCE_RUN_RELEASETOOLS is set to '1'. Not having the required vars means we
42# can't run the tests that require external tools.
43EXTERNAL_TOOLS_UNAVAILABLE = (
44    not os.environ.get('ANDROID_BUILD_TOP') and
45    os.environ.get('FORCE_RUN_RELEASETOOLS') != '1')
46
47
48def SkipIfExternalToolsUnavailable():
49  """Decorator function that allows skipping tests per tools availability."""
50  if EXTERNAL_TOOLS_UNAVAILABLE:
51    return unittest.skip('External tools unavailable')
52  return lambda func: func
53
54
55def get_testdata_dir():
56  """Returns the testdata dir, in relative to the script dir."""
57  # The script dir is the one we want, which could be different from pwd.
58  current_dir = os.path.dirname(os.path.realpath(__file__))
59  return os.path.join(current_dir, 'testdata')
60
61
62def get_current_dir():
63  """Returns the current dir, relative to the script dir."""
64  # The script dir is the one we want, which could be different from pwd.
65  current_dir = os.path.dirname(os.path.realpath(__file__))
66  return current_dir
67
68
69def get_search_path():
70  """Returns the search path that has 'framework/signapk.jar' under."""
71
72  def signapk_exists(path):
73    signapk_path = os.path.realpath(
74        os.path.join(path, 'framework', 'signapk.jar'))
75    return os.path.exists(signapk_path)
76
77  # Try with ANDROID_BUILD_TOP first.
78  full_path = os.path.realpath(os.path.join(
79      os.environ.get('ANDROID_BUILD_TOP', ''), 'out', 'host', 'linux-x86'))
80  if signapk_exists(full_path):
81    return full_path
82
83  # Otherwise try going with relative pathes.
84  current_dir = os.path.dirname(os.path.realpath(__file__))
85  for path in (
86      # In relative to 'build/make/tools/releasetools' in the Android source.
87      ['..'] * 4 + ['out', 'host', 'linux-x86'],
88      # Or running the script unpacked from otatools.zip.
89          ['..']):
90    full_path = os.path.realpath(os.path.join(current_dir, *path))
91    if signapk_exists(full_path):
92      return full_path
93  return None
94
95
96def append_avb_footer(file_path: str, partition_name: str = ""):
97  avb = avbtool.AvbTool()
98  try:
99    args = ["avbtool", "add_hashtree_footer", "--image", file_path,
100            "--partition_name", partition_name, "--do_not_generate_fec"]
101    avb.run(args)
102  except SystemExit:
103    raise ValueError(f"Failed to append hashtree footer {args}")
104
105
106def erase_avb_footer(file_path: str):
107  avb = avbtool.AvbTool()
108  try:
109    args = ["avbtool", "erase_footer", "--image", file_path]
110    avb.run(args)
111  except SystemExit:
112    raise ValueError(f"Failed to erase hashtree footer {args}")
113
114
115def construct_sparse_image(chunks, partition_name: str = ""):
116  """Returns a sparse image file constructed from the given chunks.
117
118  From system/core/libsparse/sparse_format.h.
119  typedef struct sparse_header {
120    __le32 magic;  // 0xed26ff3a
121    __le16 major_version;  // (0x1) - reject images with higher major versions
122    __le16 minor_version;  // (0x0) - allow images with higer minor versions
123    __le16 file_hdr_sz;  // 28 bytes for first revision of the file format
124    __le16 chunk_hdr_sz;  // 12 bytes for first revision of the file format
125    __le32 blk_sz;  // block size in bytes, must be a multiple of 4 (4096)
126    __le32 total_blks;  // total blocks in the non-sparse output image
127    __le32 total_chunks;  // total chunks in the sparse input image
128    __le32 image_checksum;  // CRC32 checksum of the original data, counting
129                            // "don't care" as 0. Standard 802.3 polynomial,
130                            // use a Public Domain table implementation
131  } sparse_header_t;
132
133  typedef struct chunk_header {
134    __le16 chunk_type;  // 0xCAC1 -> raw; 0xCAC2 -> fill;
135                        // 0xCAC3 -> don't care
136    __le16 reserved1;
137    __le32 chunk_sz;  // in blocks in output image
138    __le32 total_sz;  // in bytes of chunk input file including chunk header
139                      // and data
140  } chunk_header_t;
141
142  Args:
143    chunks: A list of chunks to be written. Each entry should be a tuple of
144        (chunk_type, block_number).
145
146  Returns:
147    Filename of the created sparse image.
148  """
149  SPARSE_HEADER_MAGIC = 0xED26FF3A
150  SPARSE_HEADER_FORMAT = "<I4H4I"
151  CHUNK_HEADER_FORMAT = "<2H2I"
152
153  sparse_image = common.MakeTempFile(prefix='sparse-', suffix='.img')
154  with open(sparse_image, 'wb') as fp:
155    fp.write(struct.pack(
156        SPARSE_HEADER_FORMAT, SPARSE_HEADER_MAGIC, 1, 0, 28, 12, 4096,
157        sum(chunk[1] for chunk in chunks),
158        len(chunks), 0))
159
160    for chunk in chunks:
161      data_size = 0
162      if chunk[0] == 0xCAC1:
163        data_size = 4096 * chunk[1]
164      elif chunk[0] == 0xCAC2:
165        data_size = 4
166      elif chunk[0] == 0xCAC3:
167        pass
168      else:
169        assert False, "Unsupported chunk type: {}".format(chunk[0])
170
171      fp.write(struct.pack(
172          CHUNK_HEADER_FORMAT, chunk[0], 0, chunk[1], data_size + 12))
173      if data_size != 0:
174        fp.write(os.urandom(data_size))
175
176  append_avb_footer(sparse_image, partition_name)
177  return sparse_image
178
179
180class MockScriptWriter(object):
181  """A class that mocks edify_generator.EdifyGenerator.
182
183  It simply pushes the incoming arguments onto script stack, which is to assert
184  the calls to EdifyGenerator functions.
185  """
186
187  def __init__(self, enable_comments=False):
188    self.lines = []
189    self.enable_comments = enable_comments
190
191  def Mount(self, *args):
192    self.lines.append(('Mount',) + args)
193
194  def AssertDevice(self, *args):
195    self.lines.append(('AssertDevice',) + args)
196
197  def AssertOemProperty(self, *args):
198    self.lines.append(('AssertOemProperty',) + args)
199
200  def AssertFingerprintOrThumbprint(self, *args):
201    self.lines.append(('AssertFingerprintOrThumbprint',) + args)
202
203  def AssertSomeFingerprint(self, *args):
204    self.lines.append(('AssertSomeFingerprint',) + args)
205
206  def AssertSomeThumbprint(self, *args):
207    self.lines.append(('AssertSomeThumbprint',) + args)
208
209  def Comment(self, comment):
210    if not self.enable_comments:
211      return
212    self.lines.append('# {}'.format(comment))
213
214  def AppendExtra(self, extra):
215    self.lines.append(extra)
216
217  def __str__(self):
218    return '\n'.join(self.lines)
219
220
221class ReleaseToolsTestCase(unittest.TestCase):
222  """A common base class for all the releasetools unittests."""
223
224  def tearDown(self):
225    common.Cleanup()
226
227
228class PropertyFilesTestCase(ReleaseToolsTestCase):
229
230  @staticmethod
231  def construct_zip_package(entries):
232    zip_file = common.MakeTempFile(suffix='.zip')
233    with zipfile.ZipFile(zip_file, 'w', allowZip64=True) as zip_fp:
234      for entry in entries:
235        zip_fp.writestr(
236            entry,
237            entry.replace('.', '-').upper(),
238            zipfile.ZIP_STORED)
239    return zip_file
240
241  @staticmethod
242  def _parse_property_files_string(data):
243    result = {}
244    for token in data.split(','):
245      name, info = token.split(':', 1)
246      result[name] = info
247    return result
248
249  def setUp(self):
250    common.OPTIONS.no_signing = False
251
252  def _verify_entries(self, input_file, tokens, entries):
253    for entry in entries:
254      offset, size = map(int, tokens[entry].split(':'))
255      with open(input_file, 'rb') as input_fp:
256        input_fp.seek(offset)
257        if entry == 'metadata':
258          expected = b'META-INF/COM/ANDROID/METADATA'
259        elif entry == 'metadata.pb':
260          expected = b'META-INF/COM/ANDROID/METADATA-PB'
261        else:
262          expected = entry.replace('.', '-').upper().encode()
263        self.assertEqual(expected, input_fp.read(size))
264
265
266if __name__ == '__main__':
267  # We only want to run tests from the top level directory. Unfortunately the
268  # pattern option of unittest.discover, internally using fnmatch, doesn't
269  # provide a good API to filter the test files based on directory. So we do an
270  # os walk and load them manually.
271  test_modules = []
272  base_path = os.path.dirname(os.path.realpath(__file__))
273  test_dirs = [base_path] + [
274      os.path.join(base_path, subdir) for subdir in ALLOWED_TEST_SUBDIRS
275  ]
276  for dirpath, _, files in os.walk(base_path):
277    for fn in files:
278      if dirpath in test_dirs and re.match('test_.*\\.py$', fn):
279        test_modules.append(fn[:-3])
280
281  test_suite = unittest.TestLoader().loadTestsFromNames(test_modules)
282
283  # atest needs a verbosity level of >= 2 to correctly parse the result.
284  unittest.TextTestRunner(verbosity=2).run(test_suite)
285