1#!/usr/bin/python3
2#
3# Copyright (C) 2023 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17import fnmatch
18import glob
19import os
20import shutil
21import subprocess
22import tempfile
23import zipfile
24
25
26def unzip_otatools(otatools_zip_path, output_dir, patterns=None):
27  """Unzip otatools to a directory and set the permissions for execution.
28
29  Args:
30    otatools_zip_path: The path to otatools zip archive.
31    output_dir: The root directory of the unzip output.
32    patterns: If provided, only extract files matching any of these patterns
33              from the otatools zip archive; otherwise, extract all files.
34  """
35  with zipfile.ZipFile(otatools_zip_path, 'r') as zf:
36    if patterns is None:
37      zf.extractall(path=output_dir)
38    else:
39      for file in zf.namelist():
40        if any(fnmatch.fnmatch(file, p) for p in patterns):
41          zf.extract(file, output_dir)
42
43  for f in glob.glob(os.path.join(output_dir, 'bin', '*')):
44    os.chmod(f, 0o777)
45
46
47def _parse_copy_file_pair(copy_file_pair):
48  """Convert a string to a source path and a destination path.
49
50  Args:
51    copy_file_pair: A string in the format of <src glob pattern>:<dst path>.
52
53  Returns:
54    The source path and the destination path.
55
56  Raises:
57    ValueError if the input string is in a wrong format.
58  """
59  split_pair = copy_file_pair.split(':', 1)
60  if len(split_pair) != 2:
61    raise ValueError(f'{copy_file_pair} is not a <src>:<dst> pair.')
62  src_list = glob.glob(split_pair[0])
63  if len(src_list) != 1:
64    raise ValueError(f'{copy_file_pair} has more than one matched src files: '
65                     f'{" ".join(src_list)}.')
66  return src_list[0], split_pair[1]
67
68
69def copy_files(copy_files_list, output_dir):
70  """Copy files to the output directory.
71
72  Args:
73    copy_files_list: A list of copy file pairs, where a pair defines the src
74                     glob pattern and the dst path.
75    output_dir: The root directory of the copy dst.
76
77  Raises:
78    FileExistsError if the dst file already exists.
79  """
80  for pair in copy_files_list:
81    src, dst = _parse_copy_file_pair(pair)
82    # this line does not change dst if dst is absolute.
83    dst = os.path.join(output_dir, dst)
84    os.makedirs(os.path.dirname(dst), exist_ok=True)
85    print(f'Copying {src} to {dst}')
86    if os.path.exists(dst):
87      raise FileExistsError(dst)
88    shutil.copyfile(src, dst)
89
90
91def _extract_cil_files(target_files_zip, output_dir):
92  """Extract sepolicy cil files from a target files zip archive.
93
94  Args:
95    target_files_zip: A path to the target files zip archive.
96    output_dir: The directory of extracted cil files.
97  """
98  with zipfile.ZipFile(target_files_zip, 'r') as zf:
99    cil_files = [name for name in zf.namelist() if name.endswith('.cil')]
100    for f in cil_files:
101      zf.extract(f, output_dir)
102
103
104def _get_sepolicy_plat_version(target_files_zip):
105  """Get the platform sepolicy version from a vendor target files zip archive.
106
107  Args:
108    target_files_zip: A path to the target files zip archive.
109
110  Returns:
111    A string that represents the platform sepolicy version.
112  """
113  with zipfile.ZipFile(target_files_zip, 'r') as zf:
114    try:
115      with zf.open('VENDOR/etc/selinux/plat_sepolicy_vers.txt') as ver_file:
116        return ver_file.readline().decode('utf-8').strip('\n')
117    except Exception as error:
118      print(f'cannot get platform sepolicy version from {target_files_zip}')
119      raise
120
121
122def merge_chd_sepolicy(framework_target_files_zip, vendor_target_files_zip,
123                       otatools_dir, output_dir):
124  """Merge the sepolicy files for CHD.
125
126  This function takes both the system and vendor sepolicy files from
127  framework_target_files_zip, and merges them with the vendor sepolicy from
128  vendor_target_files_zip to generate `chd_merged_sepolicy`.
129
130  In certain instances, a device may possess components that do not put their
131  sepolicy rules within the same partition as the components themselves. This
132  results in a problem that CHD is missing necessary vendor sepolicy rules
133  after the replacement of the device's vendor image with Cuttlefish. As a
134  short term solution to resolve this issue, the vendor sepolicy files from
135  framework_target_files_zip are additionally merged.
136
137  Args:
138    framework_target_files_zip: A path to the framework target files zip
139                                archive.
140    vendor_target_files_zip: A path to the vendor target files zip archive.
141    otatools_dir: The otatools directory.
142    output_dir: The output directory for generating a merged sepolicy file.
143
144  Returns:
145    The path to the CHD merged sepolicy file.
146
147  Raises:
148    FileNotFoundError if any mandatory sepolicy file is missing.
149  """
150  with tempfile.TemporaryDirectory(prefix='framework_',
151                                   dir=output_dir) as framework_dir, \
152       tempfile.TemporaryDirectory(prefix='vendor_',
153                                   dir=output_dir) as vendor_dir:
154    merged_policy = os.path.join(output_dir, 'chd_merged_sepolicy')
155    _extract_cil_files(framework_target_files_zip, framework_dir)
156    _extract_cil_files(vendor_target_files_zip, vendor_dir)
157    plat_ver = _get_sepolicy_plat_version(vendor_target_files_zip)
158    print(f'Merging sepolicy files from {framework_target_files_zip} and '
159          f'{vendor_target_files_zip}: platform version {plat_ver}.')
160
161    # (partition, path, required)
162    system_policy_files = (
163        ('system', 'etc/selinux/plat_sepolicy.cil', True),
164        ('system', f'etc/selinux/mapping/{plat_ver}.cil', True),
165        ('system', f'etc/selinux/mapping/{plat_ver}.compat.cil', False),
166        ('system_ext', 'etc/selinux/system_ext_sepolicy.cil', False),
167        ('system_ext', f'etc/selinux/mapping/{plat_ver}.cil', False),
168        ('system_ext', f'etc/selinux/mapping/{plat_ver}.compat.cil', False),
169        ('product', 'etc/selinux/product_sepolicy.cil', False),
170        ('product', f'etc/selinux/mapping/{plat_ver}.cil', False),
171    )
172    vendor_policy_files = (
173        ('vendor', 'etc/selinux/vendor_sepolicy.cil', True),
174        ('vendor', 'etc/selinux/plat_pub_versioned.cil', True),
175        ('odm', 'etc/selinux/odm_sepolicy.cil', False),
176    )
177
178    # merge system and vendor policy files from framework_dir with vendor
179    # policy files from vendor_dir.
180    merge_cmd = [
181        os.path.join(otatools_dir, 'bin', 'secilc'),
182        '-m', '-M', 'true', '-G', '-N',
183        '-o', merged_policy,
184        '-f', '/dev/null'
185    ]
186    policy_dirs_and_files = (
187        # For the normal case, we should merge the system policies from
188        # framework_dir with the vendor policies from vendor_dir.
189        (framework_dir, system_policy_files),
190        (vendor_dir, vendor_policy_files),
191
192        # Additionally merging the vendor policies from framework_dir in order
193        # to fix the policy misplaced issue.
194        # TODO (b/315474132): remove this when all the policies from
195        #                     framework_dir are moved to the right partition.
196        (framework_dir, vendor_policy_files),
197    )
198    for policy_dir, policy_files in policy_dirs_and_files:
199      for partition, path, required in policy_files:
200        policy_file = os.path.join(policy_dir, partition.upper(), path)
201        if os.path.exists(policy_file):
202          merge_cmd.append(policy_file)
203        elif required:
204          raise FileNotFoundError(f'{policy_file} does not exist')
205
206    subprocess.run(merge_cmd, check=True)
207    return merged_policy
208