1#!/usr/bin/env python
2#
3# Copyright (C) 2022 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16#
17"""Common utility functions shared by merge_* scripts.
18
19Expects items in OPTIONS prepared by merge_target_files.py.
20"""
21
22import fnmatch
23import logging
24import os
25import re
26import shutil
27import zipfile
28
29import common
30
31logger = logging.getLogger(__name__)
32OPTIONS = common.OPTIONS
33
34
35def ExtractItems(input_zip, output_dir, extract_item_list):
36  """Extracts items in extract_item_list from a zip to a dir."""
37
38  # Filter the extract_item_list to remove any items that do not exist in the
39  # zip file. Otherwise, the extraction step will fail.
40
41  with zipfile.ZipFile(input_zip, allowZip64=True) as input_zipfile:
42    input_namelist = input_zipfile.namelist()
43
44  filtered_extract_item_list = []
45  for pattern in extract_item_list:
46    if fnmatch.filter(input_namelist, pattern):
47      filtered_extract_item_list.append(pattern)
48
49  common.UnzipToDir(input_zip, output_dir, filtered_extract_item_list)
50
51
52def CopyItems(from_dir, to_dir, copy_item_list):
53  """Copies the items in copy_item_list from source to destination directory.
54
55  copy_item_list may include files and directories. Will copy the matched
56  files and create the matched directories.
57
58  Args:
59    from_dir: The source directory.
60    to_dir: The destination directory.
61    copy_item_list: Items to be copied.
62  """
63  item_paths = []
64  for root, dirs, files in os.walk(from_dir):
65    item_paths.extend(
66        os.path.relpath(path=os.path.join(root, item_name), start=from_dir)
67        for item_name in files + dirs)
68
69  filtered = set()
70  for pattern in copy_item_list:
71    filtered.update(fnmatch.filter(item_paths, pattern))
72
73  for item in filtered:
74    original_path = os.path.join(from_dir, item)
75    copied_path = os.path.join(to_dir, item)
76    copied_parent_path = os.path.dirname(copied_path)
77    if not os.path.exists(copied_parent_path):
78      os.makedirs(copied_parent_path)
79    if os.path.islink(original_path):
80      os.symlink(os.readlink(original_path), copied_path)
81    elif os.path.isdir(original_path):
82      if not os.path.exists(copied_path):
83        os.makedirs(copied_path)
84    else:
85      shutil.copyfile(original_path, copied_path)
86
87
88def GetTargetFilesItems(target_files_zipfile_or_dir):
89  """Gets a list of target files items."""
90  if zipfile.is_zipfile(target_files_zipfile_or_dir):
91    with zipfile.ZipFile(target_files_zipfile_or_dir, allowZip64=True) as fz:
92      return fz.namelist()
93  elif os.path.isdir(target_files_zipfile_or_dir):
94    item_list = []
95    for root, dirs, files in os.walk(target_files_zipfile_or_dir):
96      item_list.extend(
97          os.path.relpath(path=os.path.join(root, item),
98                          start=target_files_zipfile_or_dir)
99          for item in dirs + files)
100    return item_list
101  else:
102    raise ValueError('Target files should be either zipfile or directory.')
103
104
105def CollectTargetFiles(input_zipfile_or_dir, output_dir, item_list=None):
106  """Extracts input zipfile or copy input directory to output directory.
107
108  Extracts the input zipfile if `input_zipfile_or_dir` is a zip archive, or
109  copies the items if `input_zipfile_or_dir` is a directory.
110
111  Args:
112    input_zipfile_or_dir: The input target files, could be either a zipfile to
113      extract or a directory to copy.
114    output_dir: The output directory that the input files are either extracted
115      or copied.
116    item_list: Files to be extracted or copied. Will extract or copy all files
117      if omitted.
118  """
119  patterns = item_list if item_list else ('*',)
120  if zipfile.is_zipfile(input_zipfile_or_dir):
121    ExtractItems(input_zipfile_or_dir, output_dir, patterns)
122  elif os.path.isdir(input_zipfile_or_dir):
123    CopyItems(input_zipfile_or_dir, output_dir, patterns)
124  else:
125    raise ValueError('Target files should be either zipfile or directory.')
126
127
128def WriteSortedData(data, path):
129  """Writes the sorted contents of either a list or dict to file.
130
131  This function sorts the contents of the list or dict and then writes the
132  resulting sorted contents to a file specified by path.
133
134  Args:
135    data: The list or dict to sort and write.
136    path: Path to the file to write the sorted values to. The file at path will
137      be overridden if it exists.
138  """
139  with open(path, 'w') as output:
140    for entry in sorted(data):
141      out_str = '{}={}\n'.format(entry, data[entry]) if isinstance(
142          data, dict) else '{}\n'.format(entry)
143      output.write(out_str)
144
145
146def ValidateConfigLists():
147  """Performs validations on the merge config lists.
148
149  Returns:
150    False if a validation fails, otherwise true.
151  """
152  has_error = False
153
154  # Check that partitions only come from one input.
155  framework_partitions = ItemListToPartitionSet(OPTIONS.framework_item_list)
156  vendor_partitions = ItemListToPartitionSet(OPTIONS.vendor_item_list)
157  from_both = framework_partitions.intersection(vendor_partitions)
158  if from_both:
159    logger.error(
160        'Cannot extract items from the same partition in both the '
161        'framework and vendor builds. Please ensure only one merge config '
162        'item list (or inferred list) includes each partition: %s' %
163        ','.join(from_both))
164    has_error = True
165
166  if any([
167      key in OPTIONS.framework_misc_info_keys
168      for key in ('dynamic_partition_list', 'super_partition_groups')
169  ]):
170    logger.error('Dynamic partition misc info keys should come from '
171                 'the vendor instance of META/misc_info.txt.')
172    has_error = True
173
174  return not has_error
175
176
177# In an item list (framework or vendor), we may see entries that select whole
178# partitions. Such an entry might look like this 'SYSTEM/*' (e.g., for the
179# system partition). The following regex matches this and extracts the
180# partition name.
181
182_PARTITION_ITEM_PATTERN = re.compile(r'^([A-Z_]+)/.*$')
183_IMAGE_PARTITION_PATTERN = re.compile(r'^IMAGES/(.*)\.img$')
184_PREBUILT_IMAGE_PARTITION_PATTERN = re.compile(r'^PREBUILT_IMAGES/(.*)\.img$')
185
186
187def ItemListToPartitionSet(item_list):
188  """Converts a target files item list to a partition set.
189
190  The item list contains items that might look like 'SYSTEM/*' or 'VENDOR/*' or
191  'OTA/android-info.txt'. Items that end in '/*' are assumed to match entire
192  directories where 'SYSTEM' or 'VENDOR' is a directory name that identifies the
193  contents of a partition of the same name. Other items in the list, such as the
194  'OTA' example contain metadata. This function iterates such a list, returning
195  a set that contains the partition entries.
196
197  Args:
198    item_list: A list of items in a target files package.
199
200  Returns:
201    A set of partitions extracted from the list of items.
202  """
203
204  partition_set = set()
205
206  for item in item_list:
207    for pattern in (_PARTITION_ITEM_PATTERN, _IMAGE_PARTITION_PATTERN, _PREBUILT_IMAGE_PARTITION_PATTERN):
208      partition_match = pattern.search(item.strip())
209      if partition_match:
210        partition = partition_match.group(1).lower()
211        # These directories in target-files are not actual partitions.
212        if partition not in ('meta', 'images', 'prebuilt_images'):
213          partition_set.add(partition)
214
215  return partition_set
216
217
218# Partitions that are grabbed from the framework partial build by default.
219_FRAMEWORK_PARTITIONS = {
220    'system', 'product', 'system_ext', 'system_other', 'root',
221    'vbmeta_system', 'pvmfw'
222}
223
224
225def InferItemList(input_namelist, framework):
226  item_set = set()
227
228  # Some META items are always grabbed from partial builds directly.
229  # Others are combined in merge_meta.py.
230  if framework:
231    item_set.update([
232        'META/liblz4.so',
233        'META/postinstall_config.txt',
234        'META/zucchini_config.txt',
235    ])
236  else:  # vendor
237    item_set.update([
238        'META/kernel_configs.txt',
239        'META/kernel_version.txt',
240        'META/otakeys.txt',
241        'META/pack_radioimages.txt',
242        'META/releasetools.py',
243    ])
244
245  # Grab a set of items for the expected partitions in the partial build.
246  seen_partitions = []
247  for namelist in input_namelist:
248    if namelist.endswith('/'):
249      continue
250
251    partition = namelist.split('/')[0].lower()
252
253    # META items are grabbed above, or merged later.
254    if partition == 'meta':
255      continue
256
257    if partition in ('images', 'prebuilt_images'):
258      image_partition, extension = os.path.splitext(os.path.basename(namelist))
259      if image_partition == 'vbmeta':
260        # Always regenerate vbmeta.img since it depends on hash information
261        # from both builds.
262        continue
263      if extension in ('.img', '.map'):
264        # Include image files in IMAGES/* if the partition comes from
265        # the expected set.
266        if (framework and image_partition in _FRAMEWORK_PARTITIONS) or (
267            not framework and image_partition not in _FRAMEWORK_PARTITIONS):
268          item_set.add(namelist)
269      elif not framework:
270        # Include all miscellaneous non-image files in IMAGES/* from
271        # the vendor build.
272        item_set.add(namelist)
273      continue
274
275    # Skip already-visited partitions.
276    if partition in seen_partitions:
277      continue
278    seen_partitions.append(partition)
279
280    if (framework and partition in _FRAMEWORK_PARTITIONS) or (
281        not framework and partition not in _FRAMEWORK_PARTITIONS):
282      fs_config_prefix = '' if partition == 'system' else '%s_' % partition
283      item_set.update([
284          '%s/*' % partition.upper(),
285          'META/%sfilesystem_config.txt' % fs_config_prefix,
286      ])
287
288  return sorted(item_set)
289
290
291def InferFrameworkMiscInfoKeys(input_namelist):
292  keys = [
293      'ab_update',
294      'avb_vbmeta_system',
295      'avb_vbmeta_system_algorithm',
296      'avb_vbmeta_system_key_path',
297      'avb_vbmeta_system_rollback_index_location',
298      'default_system_dev_certificate',
299  ]
300
301  for partition in _FRAMEWORK_PARTITIONS:
302    for partition_dir in ('%s/' % partition.upper(), 'SYSTEM/%s/' % partition):
303      if partition_dir in input_namelist:
304        fs_type_prefix = '' if partition == 'system' else '%s_' % partition
305        keys.extend([
306            'avb_%s_hashtree_enable' % partition,
307            'avb_%s_add_hashtree_footer_args' % partition,
308            '%s_disable_sparse' % partition,
309            'building_%s_image' % partition,
310            '%sfs_type' % fs_type_prefix,
311        ])
312
313  return sorted(keys)
314