1#!/usr/bin/env python 2# 3# Copyright (C) 2022 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16# 17"""Common utility functions shared by merge_* scripts. 18 19Expects items in OPTIONS prepared by merge_target_files.py. 20""" 21 22import fnmatch 23import logging 24import os 25import re 26import shutil 27import zipfile 28 29import common 30 31logger = logging.getLogger(__name__) 32OPTIONS = common.OPTIONS 33 34 35def ExtractItems(input_zip, output_dir, extract_item_list): 36 """Extracts items in extract_item_list from a zip to a dir.""" 37 38 # Filter the extract_item_list to remove any items that do not exist in the 39 # zip file. Otherwise, the extraction step will fail. 40 41 with zipfile.ZipFile(input_zip, allowZip64=True) as input_zipfile: 42 input_namelist = input_zipfile.namelist() 43 44 filtered_extract_item_list = [] 45 for pattern in extract_item_list: 46 if fnmatch.filter(input_namelist, pattern): 47 filtered_extract_item_list.append(pattern) 48 49 common.UnzipToDir(input_zip, output_dir, filtered_extract_item_list) 50 51 52def CopyItems(from_dir, to_dir, copy_item_list): 53 """Copies the items in copy_item_list from source to destination directory. 54 55 copy_item_list may include files and directories. Will copy the matched 56 files and create the matched directories. 57 58 Args: 59 from_dir: The source directory. 60 to_dir: The destination directory. 61 copy_item_list: Items to be copied. 62 """ 63 item_paths = [] 64 for root, dirs, files in os.walk(from_dir): 65 item_paths.extend( 66 os.path.relpath(path=os.path.join(root, item_name), start=from_dir) 67 for item_name in files + dirs) 68 69 filtered = set() 70 for pattern in copy_item_list: 71 filtered.update(fnmatch.filter(item_paths, pattern)) 72 73 for item in filtered: 74 original_path = os.path.join(from_dir, item) 75 copied_path = os.path.join(to_dir, item) 76 copied_parent_path = os.path.dirname(copied_path) 77 if not os.path.exists(copied_parent_path): 78 os.makedirs(copied_parent_path) 79 if os.path.islink(original_path): 80 os.symlink(os.readlink(original_path), copied_path) 81 elif os.path.isdir(original_path): 82 if not os.path.exists(copied_path): 83 os.makedirs(copied_path) 84 else: 85 shutil.copyfile(original_path, copied_path) 86 87 88def GetTargetFilesItems(target_files_zipfile_or_dir): 89 """Gets a list of target files items.""" 90 if zipfile.is_zipfile(target_files_zipfile_or_dir): 91 with zipfile.ZipFile(target_files_zipfile_or_dir, allowZip64=True) as fz: 92 return fz.namelist() 93 elif os.path.isdir(target_files_zipfile_or_dir): 94 item_list = [] 95 for root, dirs, files in os.walk(target_files_zipfile_or_dir): 96 item_list.extend( 97 os.path.relpath(path=os.path.join(root, item), 98 start=target_files_zipfile_or_dir) 99 for item in dirs + files) 100 return item_list 101 else: 102 raise ValueError('Target files should be either zipfile or directory.') 103 104 105def CollectTargetFiles(input_zipfile_or_dir, output_dir, item_list=None): 106 """Extracts input zipfile or copy input directory to output directory. 107 108 Extracts the input zipfile if `input_zipfile_or_dir` is a zip archive, or 109 copies the items if `input_zipfile_or_dir` is a directory. 110 111 Args: 112 input_zipfile_or_dir: The input target files, could be either a zipfile to 113 extract or a directory to copy. 114 output_dir: The output directory that the input files are either extracted 115 or copied. 116 item_list: Files to be extracted or copied. Will extract or copy all files 117 if omitted. 118 """ 119 patterns = item_list if item_list else ('*',) 120 if zipfile.is_zipfile(input_zipfile_or_dir): 121 ExtractItems(input_zipfile_or_dir, output_dir, patterns) 122 elif os.path.isdir(input_zipfile_or_dir): 123 CopyItems(input_zipfile_or_dir, output_dir, patterns) 124 else: 125 raise ValueError('Target files should be either zipfile or directory.') 126 127 128def WriteSortedData(data, path): 129 """Writes the sorted contents of either a list or dict to file. 130 131 This function sorts the contents of the list or dict and then writes the 132 resulting sorted contents to a file specified by path. 133 134 Args: 135 data: The list or dict to sort and write. 136 path: Path to the file to write the sorted values to. The file at path will 137 be overridden if it exists. 138 """ 139 with open(path, 'w') as output: 140 for entry in sorted(data): 141 out_str = '{}={}\n'.format(entry, data[entry]) if isinstance( 142 data, dict) else '{}\n'.format(entry) 143 output.write(out_str) 144 145 146def ValidateConfigLists(): 147 """Performs validations on the merge config lists. 148 149 Returns: 150 False if a validation fails, otherwise true. 151 """ 152 has_error = False 153 154 # Check that partitions only come from one input. 155 framework_partitions = ItemListToPartitionSet(OPTIONS.framework_item_list) 156 vendor_partitions = ItemListToPartitionSet(OPTIONS.vendor_item_list) 157 from_both = framework_partitions.intersection(vendor_partitions) 158 if from_both: 159 logger.error( 160 'Cannot extract items from the same partition in both the ' 161 'framework and vendor builds. Please ensure only one merge config ' 162 'item list (or inferred list) includes each partition: %s' % 163 ','.join(from_both)) 164 has_error = True 165 166 if any([ 167 key in OPTIONS.framework_misc_info_keys 168 for key in ('dynamic_partition_list', 'super_partition_groups') 169 ]): 170 logger.error('Dynamic partition misc info keys should come from ' 171 'the vendor instance of META/misc_info.txt.') 172 has_error = True 173 174 return not has_error 175 176 177# In an item list (framework or vendor), we may see entries that select whole 178# partitions. Such an entry might look like this 'SYSTEM/*' (e.g., for the 179# system partition). The following regex matches this and extracts the 180# partition name. 181 182_PARTITION_ITEM_PATTERN = re.compile(r'^([A-Z_]+)/.*$') 183_IMAGE_PARTITION_PATTERN = re.compile(r'^IMAGES/(.*)\.img$') 184_PREBUILT_IMAGE_PARTITION_PATTERN = re.compile(r'^PREBUILT_IMAGES/(.*)\.img$') 185 186 187def ItemListToPartitionSet(item_list): 188 """Converts a target files item list to a partition set. 189 190 The item list contains items that might look like 'SYSTEM/*' or 'VENDOR/*' or 191 'OTA/android-info.txt'. Items that end in '/*' are assumed to match entire 192 directories where 'SYSTEM' or 'VENDOR' is a directory name that identifies the 193 contents of a partition of the same name. Other items in the list, such as the 194 'OTA' example contain metadata. This function iterates such a list, returning 195 a set that contains the partition entries. 196 197 Args: 198 item_list: A list of items in a target files package. 199 200 Returns: 201 A set of partitions extracted from the list of items. 202 """ 203 204 partition_set = set() 205 206 for item in item_list: 207 for pattern in (_PARTITION_ITEM_PATTERN, _IMAGE_PARTITION_PATTERN, _PREBUILT_IMAGE_PARTITION_PATTERN): 208 partition_match = pattern.search(item.strip()) 209 if partition_match: 210 partition = partition_match.group(1).lower() 211 # These directories in target-files are not actual partitions. 212 if partition not in ('meta', 'images', 'prebuilt_images'): 213 partition_set.add(partition) 214 215 return partition_set 216 217 218# Partitions that are grabbed from the framework partial build by default. 219_FRAMEWORK_PARTITIONS = { 220 'system', 'product', 'system_ext', 'system_other', 'root', 221 'vbmeta_system', 'pvmfw' 222} 223 224 225def InferItemList(input_namelist, framework): 226 item_set = set() 227 228 # Some META items are always grabbed from partial builds directly. 229 # Others are combined in merge_meta.py. 230 if framework: 231 item_set.update([ 232 'META/liblz4.so', 233 'META/postinstall_config.txt', 234 'META/zucchini_config.txt', 235 ]) 236 else: # vendor 237 item_set.update([ 238 'META/kernel_configs.txt', 239 'META/kernel_version.txt', 240 'META/otakeys.txt', 241 'META/pack_radioimages.txt', 242 'META/releasetools.py', 243 ]) 244 245 # Grab a set of items for the expected partitions in the partial build. 246 seen_partitions = [] 247 for namelist in input_namelist: 248 if namelist.endswith('/'): 249 continue 250 251 partition = namelist.split('/')[0].lower() 252 253 # META items are grabbed above, or merged later. 254 if partition == 'meta': 255 continue 256 257 if partition in ('images', 'prebuilt_images'): 258 image_partition, extension = os.path.splitext(os.path.basename(namelist)) 259 if image_partition == 'vbmeta': 260 # Always regenerate vbmeta.img since it depends on hash information 261 # from both builds. 262 continue 263 if extension in ('.img', '.map'): 264 # Include image files in IMAGES/* if the partition comes from 265 # the expected set. 266 if (framework and image_partition in _FRAMEWORK_PARTITIONS) or ( 267 not framework and image_partition not in _FRAMEWORK_PARTITIONS): 268 item_set.add(namelist) 269 elif not framework: 270 # Include all miscellaneous non-image files in IMAGES/* from 271 # the vendor build. 272 item_set.add(namelist) 273 continue 274 275 # Skip already-visited partitions. 276 if partition in seen_partitions: 277 continue 278 seen_partitions.append(partition) 279 280 if (framework and partition in _FRAMEWORK_PARTITIONS) or ( 281 not framework and partition not in _FRAMEWORK_PARTITIONS): 282 fs_config_prefix = '' if partition == 'system' else '%s_' % partition 283 item_set.update([ 284 '%s/*' % partition.upper(), 285 'META/%sfilesystem_config.txt' % fs_config_prefix, 286 ]) 287 288 return sorted(item_set) 289 290 291def InferFrameworkMiscInfoKeys(input_namelist): 292 keys = [ 293 'ab_update', 294 'avb_vbmeta_system', 295 'avb_vbmeta_system_algorithm', 296 'avb_vbmeta_system_key_path', 297 'avb_vbmeta_system_rollback_index_location', 298 'default_system_dev_certificate', 299 ] 300 301 for partition in _FRAMEWORK_PARTITIONS: 302 for partition_dir in ('%s/' % partition.upper(), 'SYSTEM/%s/' % partition): 303 if partition_dir in input_namelist: 304 fs_type_prefix = '' if partition == 'system' else '%s_' % partition 305 keys.extend([ 306 'avb_%s_hashtree_enable' % partition, 307 'avb_%s_add_hashtree_footer_args' % partition, 308 '%s_disable_sparse' % partition, 309 'building_%s_image' % partition, 310 '%sfs_type' % fs_type_prefix, 311 ]) 312 313 return sorted(keys) 314