1#!/usr/bin/env python3 2# 3# Copyright 2021, The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Repacks the boot image. 18 19Unpacks the boot image and the ramdisk inside, then add files into 20the ramdisk to repack the boot image. 21""" 22 23import argparse 24import datetime 25import enum 26import glob 27import os 28import shlex 29import shutil 30import subprocess 31import tempfile 32 33 34class TempFileManager: 35 """Manages temporary files and dirs.""" 36 37 def __init__(self): 38 self._temp_files = [] 39 40 def __del__(self): 41 """Removes temp dirs and files.""" 42 for f in self._temp_files: 43 if os.path.isdir(f): 44 shutil.rmtree(f, ignore_errors=True) 45 else: 46 os.remove(f) 47 48 def make_temp_dir(self, prefix='tmp', suffix=''): 49 """Makes a temporary dir that will be cleaned up in the destructor. 50 51 Returns: 52 The absolute pathname of the new directory. 53 """ 54 dir_name = tempfile.mkdtemp(prefix=prefix, suffix=suffix) 55 self._temp_files.append(dir_name) 56 return dir_name 57 58 def make_temp_file(self, prefix='tmp', suffix=''): 59 """Make a temp file that will be deleted in the destructor. 60 61 Returns: 62 The absolute pathname of the new file. 63 """ 64 fd, file_name = tempfile.mkstemp(prefix=prefix, suffix=suffix) 65 os.close(fd) 66 self._temp_files.append(file_name) 67 return file_name 68 69 70class RamdiskFormat(enum.Enum): 71 """Enum class for different ramdisk compression formats.""" 72 LZ4 = 1 73 GZIP = 2 74 75 76class BootImageType(enum.Enum): 77 """Enum class for different boot image types.""" 78 BOOT_IMAGE = 1 79 VENDOR_BOOT_IMAGE = 2 80 SINGLE_RAMDISK_FRAGMENT = 3 81 MULTIPLE_RAMDISK_FRAGMENTS = 4 82 83 84class RamdiskImage: 85 """A class that supports packing/unpacking a ramdisk.""" 86 def __init__(self, ramdisk_img, unpack=True): 87 self._ramdisk_img = ramdisk_img 88 self._ramdisk_format = None 89 self._ramdisk_dir = None 90 self._temp_file_manager = TempFileManager() 91 92 if unpack: 93 self._unpack_ramdisk() 94 else: 95 self._ramdisk_dir = self._temp_file_manager.make_temp_dir( 96 suffix='_new_ramdisk') 97 98 def _unpack_ramdisk(self): 99 """Unpacks the ramdisk.""" 100 self._ramdisk_dir = self._temp_file_manager.make_temp_dir( 101 suffix='_' + os.path.basename(self._ramdisk_img)) 102 103 # The compression format might be in 'lz4' or 'gzip' format, 104 # trying lz4 first. 105 for compression_type, compression_util in [ 106 (RamdiskFormat.LZ4, 'lz4'), 107 (RamdiskFormat.GZIP, 'gzip')]: 108 109 # Command arguments: 110 # -d: decompression 111 # -c: write to stdout 112 decompression_cmd = [ 113 compression_util, '-d', '-c', self._ramdisk_img] 114 115 decompressed_result = subprocess.run( 116 decompression_cmd, check=False, capture_output=True) 117 118 if decompressed_result.returncode == 0: 119 self._ramdisk_format = compression_type 120 break 121 122 if self._ramdisk_format is not None: 123 # toybox cpio arguments: 124 # -i: extract files from stdin 125 # -d: create directories if needed 126 # -u: override existing files 127 subprocess.run( 128 ['toybox', 'cpio', '-idu'], check=True, 129 input=decompressed_result.stdout, cwd=self._ramdisk_dir) 130 131 print(f"=== Unpacked ramdisk: '{self._ramdisk_img}' at " 132 f"'{self._ramdisk_dir}' ===") 133 else: 134 raise RuntimeError('Failed to decompress ramdisk.') 135 136 def repack_ramdisk(self, out_ramdisk_file): 137 """Repacks a ramdisk from self._ramdisk_dir. 138 139 Args: 140 out_ramdisk_file: the output ramdisk file to save. 141 """ 142 compression_cmd = ['lz4', '-l', '-12', '--favor-decSpeed'] 143 if self._ramdisk_format == RamdiskFormat.GZIP: 144 compression_cmd = ['gzip'] 145 146 print('Repacking ramdisk, which might take a few seconds ...') 147 148 mkbootfs_result = subprocess.run( 149 ['mkbootfs', self._ramdisk_dir], check=True, capture_output=True) 150 151 with open(out_ramdisk_file, 'w') as output_fd: 152 subprocess.run(compression_cmd, check=True, 153 input=mkbootfs_result.stdout, stdout=output_fd) 154 155 print("=== Repacked ramdisk: '{}' ===".format(out_ramdisk_file)) 156 157 @property 158 def ramdisk_dir(self): 159 """Returns the internal ramdisk dir.""" 160 return self._ramdisk_dir 161 162 163class BootImage: 164 """A class that supports packing/unpacking a boot.img and ramdisk.""" 165 166 def __init__(self, bootimg): 167 self._bootimg = bootimg 168 self._bootimg_dir = None 169 self._bootimg_type = None 170 self._ramdisk = None 171 self._previous_mkbootimg_args = [] 172 self._temp_file_manager = TempFileManager() 173 174 self._unpack_bootimg() 175 176 def _get_vendor_ramdisks(self): 177 """Returns a list of vendor ramdisks after unpack.""" 178 return sorted(glob.glob( 179 os.path.join(self._bootimg_dir, 'vendor_ramdisk*'))) 180 181 def _unpack_bootimg(self): 182 """Unpacks the boot.img and the ramdisk inside.""" 183 self._bootimg_dir = self._temp_file_manager.make_temp_dir( 184 suffix='_' + os.path.basename(self._bootimg)) 185 186 # Unpacks the boot.img first. 187 unpack_bootimg_cmds = [ 188 'unpack_bootimg', 189 '--boot_img', self._bootimg, 190 '--out', self._bootimg_dir, 191 '--format=mkbootimg', 192 ] 193 result = subprocess.run(unpack_bootimg_cmds, check=True, 194 capture_output=True, encoding='utf-8') 195 self._previous_mkbootimg_args = shlex.split(result.stdout) 196 print("=== Unpacked boot image: '{}' ===".format(self._bootimg)) 197 198 # From the output dir, checks there is 'ramdisk' or 'vendor_ramdisk'. 199 ramdisk = os.path.join(self._bootimg_dir, 'ramdisk') 200 vendor_ramdisk = os.path.join(self._bootimg_dir, 'vendor_ramdisk') 201 vendor_ramdisks = self._get_vendor_ramdisks() 202 if os.path.exists(ramdisk): 203 self._ramdisk = RamdiskImage(ramdisk) 204 self._bootimg_type = BootImageType.BOOT_IMAGE 205 elif os.path.exists(vendor_ramdisk): 206 self._ramdisk = RamdiskImage(vendor_ramdisk) 207 self._bootimg_type = BootImageType.VENDOR_BOOT_IMAGE 208 elif len(vendor_ramdisks) == 1: 209 self._ramdisk = RamdiskImage(vendor_ramdisks[0]) 210 self._bootimg_type = BootImageType.SINGLE_RAMDISK_FRAGMENT 211 elif len(vendor_ramdisks) > 1: 212 # Creates an empty RamdiskImage() below, without unpack. 213 # We'll then add files into this newly created ramdisk, then pack 214 # it with other vendor ramdisks together. 215 self._ramdisk = RamdiskImage(ramdisk_img=None, unpack=False) 216 self._bootimg_type = BootImageType.MULTIPLE_RAMDISK_FRAGMENTS 217 else: 218 raise RuntimeError('Both ramdisk and vendor_ramdisk do not exist.') 219 220 def repack_bootimg(self): 221 """Repacks the ramdisk and rebuild the boot.img""" 222 223 new_ramdisk = self._temp_file_manager.make_temp_file( 224 prefix='ramdisk-patched') 225 self._ramdisk.repack_ramdisk(new_ramdisk) 226 227 mkbootimg_cmd = ['mkbootimg'] 228 229 # Uses previous mkbootimg args, e.g., --vendor_cmdline, --dtb_offset. 230 mkbootimg_cmd.extend(self._previous_mkbootimg_args) 231 232 ramdisk_option = '' 233 if self._bootimg_type == BootImageType.BOOT_IMAGE: 234 ramdisk_option = '--ramdisk' 235 mkbootimg_cmd.extend(['--output', self._bootimg]) 236 elif self._bootimg_type == BootImageType.VENDOR_BOOT_IMAGE: 237 ramdisk_option = '--vendor_ramdisk' 238 mkbootimg_cmd.extend(['--vendor_boot', self._bootimg]) 239 elif self._bootimg_type == BootImageType.SINGLE_RAMDISK_FRAGMENT: 240 ramdisk_option = '--vendor_ramdisk_fragment' 241 mkbootimg_cmd.extend(['--vendor_boot', self._bootimg]) 242 elif self._bootimg_type == BootImageType.MULTIPLE_RAMDISK_FRAGMENTS: 243 mkbootimg_cmd.extend(['--ramdisk_type', 'PLATFORM']) 244 ramdisk_name = ( 245 'RAMDISK_' + 246 datetime.datetime.now().strftime('%Y-%m-%d_%H:%M:%S')) 247 mkbootimg_cmd.extend(['--ramdisk_name', ramdisk_name]) 248 mkbootimg_cmd.extend(['--vendor_ramdisk_fragment', new_ramdisk]) 249 mkbootimg_cmd.extend(['--vendor_boot', self._bootimg]) 250 251 if ramdisk_option and ramdisk_option not in mkbootimg_cmd: 252 raise RuntimeError("Failed to find '{}' from:\n {}".format( 253 ramdisk_option, shlex.join(mkbootimg_cmd))) 254 # Replaces the original ramdisk with the newly packed ramdisk. 255 if ramdisk_option: 256 ramdisk_index = mkbootimg_cmd.index(ramdisk_option) + 1 257 mkbootimg_cmd[ramdisk_index] = new_ramdisk 258 259 subprocess.check_call(mkbootimg_cmd) 260 print("=== Repacked boot image: '{}' ===".format(self._bootimg)) 261 262 def add_files(self, copy_pairs): 263 """Copy files specified by copy_pairs into current ramdisk. 264 265 Args: 266 copy_pairs: a list of (src_pathname, dst_file) pairs. 267 """ 268 # Creates missing parent dirs with 0o755. 269 original_mask = os.umask(0o022) 270 for src_pathname, dst_file in copy_pairs: 271 dst_pathname = os.path.join(self.ramdisk_dir, dst_file) 272 dst_dir = os.path.dirname(dst_pathname) 273 if not os.path.exists(dst_dir): 274 print("Creating dir '{}'".format(dst_dir)) 275 os.makedirs(dst_dir, 0o755) 276 print(f"Copying file '{src_pathname}' to '{dst_pathname}'") 277 shutil.copy2(src_pathname, dst_pathname, follow_symlinks=False) 278 os.umask(original_mask) 279 280 @property 281 def ramdisk_dir(self): 282 """Returns the internal ramdisk dir.""" 283 return self._ramdisk.ramdisk_dir 284 285 286def _get_repack_usage(): 287 return """Usage examples: 288 289 * --ramdisk_add SRC_FILE:DST_FILE 290 291 If --local is given, copy SRC_FILE from the local filesystem to DST_FILE in 292 the ramdisk of --dst_bootimg. 293 If --src_bootimg is specified, copy SRC_FILE from the ramdisk of 294 --src_bootimg to DST_FILE in the ramdisk of --dst_bootimg. 295 296 Copies a local file 'userdebug_plat_sepolicy.cil' into the ramdisk of 297 --dst_bootimg, and then rebuild --dst_bootimg: 298 299 $ %(prog)s \\ 300 --local --dst_bootimg vendor_boot-debug.img \\ 301 --ramdisk_add userdebug_plat_sepolicy.cil:userdebug_plat_sepolicy.cil 302 303 Copies 'first_stage_ramdisk/userdebug_plat_sepolicy.cil' from the ramdisk 304 of --src_bootimg to 'userdebug_plat_sepolicy.cil' in the ramdisk of 305 --dst_bootimg, and then rebuild --dst_bootimg: 306 307 $ %(prog)s \\ 308 --src_bootimg boot-debug-5.4.img --dst_bootimg vendor_boot-debug.img \\ 309 --ramdisk_add first_stage_ramdisk/userdebug_plat_sepolicy.cil:userdebug_plat_sepolicy.cil 310 311 This option can be specified multiple times to copy multiple files: 312 313 $ %(prog)s \\ 314 --local --dst_bootimg vendor_boot-debug.img \\ 315 --ramdisk_add file1:path/in/dst_bootimg/file1 \\ 316 --ramdisk_add file2:path/in/dst_bootimg/file2 317""" 318 319 320def _parse_args(): 321 """Parse command-line options.""" 322 parser = argparse.ArgumentParser( 323 formatter_class=argparse.RawDescriptionHelpFormatter, 324 description='Repacks boot, recovery or vendor_boot image by importing ' 325 'ramdisk files from --src_bootimg to --dst_bootimg.', 326 epilog=_get_repack_usage(), 327 ) 328 329 src_group = parser.add_mutually_exclusive_group(required=True) 330 src_group.add_argument( 331 '--src_bootimg', help='filename to source boot image', 332 type=BootImage) 333 src_group.add_argument( 334 '--local', help='use local files as repack source', 335 action='store_true') 336 337 parser.add_argument( 338 '--dst_bootimg', help='filename to destination boot image', 339 type=BootImage, required=True) 340 parser.add_argument( 341 '--ramdisk_add', metavar='SRC_FILE:DST_FILE', 342 help='a copy pair to copy into the ramdisk of --dst_bootimg', 343 action='extend', nargs='+', required=True) 344 345 args = parser.parse_args() 346 347 # Parse args.ramdisk_add to a list of copy pairs. 348 if args.src_bootimg: 349 args.ramdisk_add = [ 350 _parse_ramdisk_copy_pair(p, args.src_bootimg.ramdisk_dir) 351 for p in args.ramdisk_add 352 ] 353 else: 354 # Repack from local files. 355 args.ramdisk_add = [ 356 _parse_ramdisk_copy_pair(p) for p in args.ramdisk_add 357 ] 358 359 return args 360 361 362def _parse_ramdisk_copy_pair(pair, src_ramdisk_dir=None): 363 """Parse a ramdisk copy pair argument.""" 364 if ':' in pair: 365 src_file, dst_file = pair.split(':', maxsplit=1) 366 else: 367 src_file = dst_file = pair 368 369 # os.path.join() only works on relative path components. 370 # If a component is an absolute path, all previous components are thrown 371 # away and joining continues from the absolute path component. 372 # So make sure the file name is not absolute before calling os.path.join(). 373 if src_ramdisk_dir: 374 if os.path.isabs(src_file): 375 raise ValueError('file name cannot be absolute when repacking from ' 376 'a ramdisk: ' + src_file) 377 src_pathname = os.path.join(src_ramdisk_dir, src_file) 378 else: 379 src_pathname = src_file 380 if os.path.isabs(dst_file): 381 raise ValueError('destination file name cannot be absolute: ' + 382 dst_file) 383 return (src_pathname, dst_file) 384 385 386def main(): 387 """Parse arguments and repack boot image.""" 388 args = _parse_args() 389 args.dst_bootimg.add_files(args.ramdisk_add) 390 args.dst_bootimg.repack_bootimg() 391 392 393if __name__ == '__main__': 394 main() 395