1#!/usr/bin/env python 2# 3# Copyright (C) 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18""" 19Utils for running unittests. 20""" 21 22import avbtool 23import logging 24import os 25import os.path 26import re 27import struct 28import sys 29import unittest 30import zipfile 31 32import common 33 34# Some test runner doesn't like outputs from stderr. 35logging.basicConfig(stream=sys.stdout) 36 37ALLOWED_TEST_SUBDIRS = ('merge',) 38 39# Use ANDROID_BUILD_TOP as an indicator to tell if the needed tools (e.g. 40# avbtool, mke2fs) are available while running the tests, unless 41# FORCE_RUN_RELEASETOOLS is set to '1'. Not having the required vars means we 42# can't run the tests that require external tools. 43EXTERNAL_TOOLS_UNAVAILABLE = ( 44 not os.environ.get('ANDROID_BUILD_TOP') and 45 os.environ.get('FORCE_RUN_RELEASETOOLS') != '1') 46 47 48def SkipIfExternalToolsUnavailable(): 49 """Decorator function that allows skipping tests per tools availability.""" 50 if EXTERNAL_TOOLS_UNAVAILABLE: 51 return unittest.skip('External tools unavailable') 52 return lambda func: func 53 54 55def get_testdata_dir(): 56 """Returns the testdata dir, in relative to the script dir.""" 57 # The script dir is the one we want, which could be different from pwd. 58 current_dir = os.path.dirname(os.path.realpath(__file__)) 59 return os.path.join(current_dir, 'testdata') 60 61 62def get_current_dir(): 63 """Returns the current dir, relative to the script dir.""" 64 # The script dir is the one we want, which could be different from pwd. 65 current_dir = os.path.dirname(os.path.realpath(__file__)) 66 return current_dir 67 68 69def get_search_path(): 70 """Returns the search path that has 'framework/signapk.jar' under.""" 71 72 def signapk_exists(path): 73 signapk_path = os.path.realpath( 74 os.path.join(path, 'framework', 'signapk.jar')) 75 return os.path.exists(signapk_path) 76 77 # Try with ANDROID_BUILD_TOP first. 78 full_path = os.path.realpath(os.path.join( 79 os.environ.get('ANDROID_BUILD_TOP', ''), 'out', 'host', 'linux-x86')) 80 if signapk_exists(full_path): 81 return full_path 82 83 # Otherwise try going with relative pathes. 84 current_dir = os.path.dirname(os.path.realpath(__file__)) 85 for path in ( 86 # In relative to 'build/make/tools/releasetools' in the Android source. 87 ['..'] * 4 + ['out', 'host', 'linux-x86'], 88 # Or running the script unpacked from otatools.zip. 89 ['..']): 90 full_path = os.path.realpath(os.path.join(current_dir, *path)) 91 if signapk_exists(full_path): 92 return full_path 93 return None 94 95 96def append_avb_footer(file_path: str, partition_name: str = ""): 97 avb = avbtool.AvbTool() 98 try: 99 args = ["avbtool", "add_hashtree_footer", "--image", file_path, 100 "--partition_name", partition_name, "--do_not_generate_fec"] 101 avb.run(args) 102 except SystemExit: 103 raise ValueError(f"Failed to append hashtree footer {args}") 104 105 106def erase_avb_footer(file_path: str): 107 avb = avbtool.AvbTool() 108 try: 109 args = ["avbtool", "erase_footer", "--image", file_path] 110 avb.run(args) 111 except SystemExit: 112 raise ValueError(f"Failed to erase hashtree footer {args}") 113 114 115def construct_sparse_image(chunks, partition_name: str = ""): 116 """Returns a sparse image file constructed from the given chunks. 117 118 From system/core/libsparse/sparse_format.h. 119 typedef struct sparse_header { 120 __le32 magic; // 0xed26ff3a 121 __le16 major_version; // (0x1) - reject images with higher major versions 122 __le16 minor_version; // (0x0) - allow images with higer minor versions 123 __le16 file_hdr_sz; // 28 bytes for first revision of the file format 124 __le16 chunk_hdr_sz; // 12 bytes for first revision of the file format 125 __le32 blk_sz; // block size in bytes, must be a multiple of 4 (4096) 126 __le32 total_blks; // total blocks in the non-sparse output image 127 __le32 total_chunks; // total chunks in the sparse input image 128 __le32 image_checksum; // CRC32 checksum of the original data, counting 129 // "don't care" as 0. Standard 802.3 polynomial, 130 // use a Public Domain table implementation 131 } sparse_header_t; 132 133 typedef struct chunk_header { 134 __le16 chunk_type; // 0xCAC1 -> raw; 0xCAC2 -> fill; 135 // 0xCAC3 -> don't care 136 __le16 reserved1; 137 __le32 chunk_sz; // in blocks in output image 138 __le32 total_sz; // in bytes of chunk input file including chunk header 139 // and data 140 } chunk_header_t; 141 142 Args: 143 chunks: A list of chunks to be written. Each entry should be a tuple of 144 (chunk_type, block_number). 145 146 Returns: 147 Filename of the created sparse image. 148 """ 149 SPARSE_HEADER_MAGIC = 0xED26FF3A 150 SPARSE_HEADER_FORMAT = "<I4H4I" 151 CHUNK_HEADER_FORMAT = "<2H2I" 152 153 sparse_image = common.MakeTempFile(prefix='sparse-', suffix='.img') 154 with open(sparse_image, 'wb') as fp: 155 fp.write(struct.pack( 156 SPARSE_HEADER_FORMAT, SPARSE_HEADER_MAGIC, 1, 0, 28, 12, 4096, 157 sum(chunk[1] for chunk in chunks), 158 len(chunks), 0)) 159 160 for chunk in chunks: 161 data_size = 0 162 if chunk[0] == 0xCAC1: 163 data_size = 4096 * chunk[1] 164 elif chunk[0] == 0xCAC2: 165 data_size = 4 166 elif chunk[0] == 0xCAC3: 167 pass 168 else: 169 assert False, "Unsupported chunk type: {}".format(chunk[0]) 170 171 fp.write(struct.pack( 172 CHUNK_HEADER_FORMAT, chunk[0], 0, chunk[1], data_size + 12)) 173 if data_size != 0: 174 fp.write(os.urandom(data_size)) 175 176 append_avb_footer(sparse_image, partition_name) 177 return sparse_image 178 179 180class MockScriptWriter(object): 181 """A class that mocks edify_generator.EdifyGenerator. 182 183 It simply pushes the incoming arguments onto script stack, which is to assert 184 the calls to EdifyGenerator functions. 185 """ 186 187 def __init__(self, enable_comments=False): 188 self.lines = [] 189 self.enable_comments = enable_comments 190 191 def Mount(self, *args): 192 self.lines.append(('Mount',) + args) 193 194 def AssertDevice(self, *args): 195 self.lines.append(('AssertDevice',) + args) 196 197 def AssertOemProperty(self, *args): 198 self.lines.append(('AssertOemProperty',) + args) 199 200 def AssertFingerprintOrThumbprint(self, *args): 201 self.lines.append(('AssertFingerprintOrThumbprint',) + args) 202 203 def AssertSomeFingerprint(self, *args): 204 self.lines.append(('AssertSomeFingerprint',) + args) 205 206 def AssertSomeThumbprint(self, *args): 207 self.lines.append(('AssertSomeThumbprint',) + args) 208 209 def Comment(self, comment): 210 if not self.enable_comments: 211 return 212 self.lines.append('# {}'.format(comment)) 213 214 def AppendExtra(self, extra): 215 self.lines.append(extra) 216 217 def __str__(self): 218 return '\n'.join(self.lines) 219 220 221class ReleaseToolsTestCase(unittest.TestCase): 222 """A common base class for all the releasetools unittests.""" 223 224 def tearDown(self): 225 common.Cleanup() 226 227 228class PropertyFilesTestCase(ReleaseToolsTestCase): 229 230 @staticmethod 231 def construct_zip_package(entries): 232 zip_file = common.MakeTempFile(suffix='.zip') 233 with zipfile.ZipFile(zip_file, 'w', allowZip64=True) as zip_fp: 234 for entry in entries: 235 zip_fp.writestr( 236 entry, 237 entry.replace('.', '-').upper(), 238 zipfile.ZIP_STORED) 239 return zip_file 240 241 @staticmethod 242 def _parse_property_files_string(data): 243 result = {} 244 for token in data.split(','): 245 name, info = token.split(':', 1) 246 result[name] = info 247 return result 248 249 def setUp(self): 250 common.OPTIONS.no_signing = False 251 252 def _verify_entries(self, input_file, tokens, entries): 253 for entry in entries: 254 offset, size = map(int, tokens[entry].split(':')) 255 with open(input_file, 'rb') as input_fp: 256 input_fp.seek(offset) 257 if entry == 'metadata': 258 expected = b'META-INF/COM/ANDROID/METADATA' 259 elif entry == 'metadata.pb': 260 expected = b'META-INF/COM/ANDROID/METADATA-PB' 261 else: 262 expected = entry.replace('.', '-').upper().encode() 263 self.assertEqual(expected, input_fp.read(size)) 264 265 266if __name__ == '__main__': 267 # We only want to run tests from the top level directory. Unfortunately the 268 # pattern option of unittest.discover, internally using fnmatch, doesn't 269 # provide a good API to filter the test files based on directory. So we do an 270 # os walk and load them manually. 271 test_modules = [] 272 base_path = os.path.dirname(os.path.realpath(__file__)) 273 test_dirs = [base_path] + [ 274 os.path.join(base_path, subdir) for subdir in ALLOWED_TEST_SUBDIRS 275 ] 276 for dirpath, _, files in os.walk(base_path): 277 for fn in files: 278 if dirpath in test_dirs and re.match('test_.*\\.py$', fn): 279 test_modules.append(fn[:-3]) 280 281 test_suite = unittest.TestLoader().loadTestsFromNames(test_modules) 282 283 # atest needs a verbosity level of >= 2 to correctly parse the result. 284 unittest.TextTestRunner(verbosity=2).run(test_suite) 285