1#!/usr/bin/env python3
2#
3# Copyright 2021, The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Repacks the boot image.
18
19Unpacks the boot image and the ramdisk inside, then add files into
20the ramdisk to repack the boot image.
21"""
22
23import argparse
24import datetime
25import enum
26import glob
27import os
28import shlex
29import shutil
30import subprocess
31import tempfile
32
33
34class TempFileManager:
35    """Manages temporary files and dirs."""
36
37    def __init__(self):
38        self._temp_files = []
39
40    def __del__(self):
41        """Removes temp dirs and files."""
42        for f in self._temp_files:
43            if os.path.isdir(f):
44                shutil.rmtree(f, ignore_errors=True)
45            else:
46                os.remove(f)
47
48    def make_temp_dir(self, prefix='tmp', suffix=''):
49        """Makes a temporary dir that will be cleaned up in the destructor.
50
51        Returns:
52            The absolute pathname of the new directory.
53        """
54        dir_name = tempfile.mkdtemp(prefix=prefix, suffix=suffix)
55        self._temp_files.append(dir_name)
56        return dir_name
57
58    def make_temp_file(self, prefix='tmp', suffix=''):
59        """Make a temp file that will be deleted in the destructor.
60
61        Returns:
62            The absolute pathname of the new file.
63        """
64        fd, file_name = tempfile.mkstemp(prefix=prefix, suffix=suffix)
65        os.close(fd)
66        self._temp_files.append(file_name)
67        return file_name
68
69
70class RamdiskFormat(enum.Enum):
71    """Enum class for different ramdisk compression formats."""
72    LZ4 = 1
73    GZIP = 2
74
75
76class BootImageType(enum.Enum):
77    """Enum class for different boot image types."""
78    BOOT_IMAGE = 1
79    VENDOR_BOOT_IMAGE = 2
80    SINGLE_RAMDISK_FRAGMENT = 3
81    MULTIPLE_RAMDISK_FRAGMENTS = 4
82
83
84class RamdiskImage:
85    """A class that supports packing/unpacking a ramdisk."""
86    def __init__(self, ramdisk_img, unpack=True):
87        self._ramdisk_img = ramdisk_img
88        self._ramdisk_format = None
89        self._ramdisk_dir = None
90        self._temp_file_manager = TempFileManager()
91
92        if unpack:
93            self._unpack_ramdisk()
94        else:
95            self._ramdisk_dir = self._temp_file_manager.make_temp_dir(
96                suffix='_new_ramdisk')
97
98    def _unpack_ramdisk(self):
99        """Unpacks the ramdisk."""
100        self._ramdisk_dir = self._temp_file_manager.make_temp_dir(
101            suffix='_' + os.path.basename(self._ramdisk_img))
102
103        # The compression format might be in 'lz4' or 'gzip' format,
104        # trying lz4 first.
105        for compression_type, compression_util in [
106            (RamdiskFormat.LZ4, 'lz4'),
107            (RamdiskFormat.GZIP, 'gzip')]:
108
109            # Command arguments:
110            #   -d: decompression
111            #   -c: write to stdout
112            decompression_cmd = [
113                compression_util, '-d', '-c', self._ramdisk_img]
114
115            decompressed_result = subprocess.run(
116                decompression_cmd, check=False, capture_output=True)
117
118            if decompressed_result.returncode == 0:
119                self._ramdisk_format = compression_type
120                break
121
122        if self._ramdisk_format is not None:
123            # toybox cpio arguments:
124            #   -i: extract files from stdin
125            #   -d: create directories if needed
126            #   -u: override existing files
127            subprocess.run(
128                ['toybox', 'cpio', '-idu'], check=True,
129                input=decompressed_result.stdout, cwd=self._ramdisk_dir)
130
131            print(f"=== Unpacked ramdisk: '{self._ramdisk_img}' at "
132                  f"'{self._ramdisk_dir}' ===")
133        else:
134            raise RuntimeError('Failed to decompress ramdisk.')
135
136    def repack_ramdisk(self, out_ramdisk_file):
137        """Repacks a ramdisk from self._ramdisk_dir.
138
139        Args:
140            out_ramdisk_file: the output ramdisk file to save.
141        """
142        compression_cmd = ['lz4', '-l', '-12', '--favor-decSpeed']
143        if self._ramdisk_format == RamdiskFormat.GZIP:
144            compression_cmd = ['gzip']
145
146        print('Repacking ramdisk, which might take a few seconds ...')
147
148        mkbootfs_result = subprocess.run(
149            ['mkbootfs', self._ramdisk_dir], check=True, capture_output=True)
150
151        with open(out_ramdisk_file, 'w') as output_fd:
152            subprocess.run(compression_cmd, check=True,
153                           input=mkbootfs_result.stdout, stdout=output_fd)
154
155        print("=== Repacked ramdisk: '{}' ===".format(out_ramdisk_file))
156
157    @property
158    def ramdisk_dir(self):
159        """Returns the internal ramdisk dir."""
160        return self._ramdisk_dir
161
162
163class BootImage:
164    """A class that supports packing/unpacking a boot.img and ramdisk."""
165
166    def __init__(self, bootimg):
167        self._bootimg = bootimg
168        self._bootimg_dir = None
169        self._bootimg_type = None
170        self._ramdisk = None
171        self._previous_mkbootimg_args = []
172        self._temp_file_manager = TempFileManager()
173
174        self._unpack_bootimg()
175
176    def _get_vendor_ramdisks(self):
177        """Returns a list of vendor ramdisks after unpack."""
178        return sorted(glob.glob(
179            os.path.join(self._bootimg_dir, 'vendor_ramdisk*')))
180
181    def _unpack_bootimg(self):
182        """Unpacks the boot.img and the ramdisk inside."""
183        self._bootimg_dir = self._temp_file_manager.make_temp_dir(
184            suffix='_' + os.path.basename(self._bootimg))
185
186        # Unpacks the boot.img first.
187        unpack_bootimg_cmds = [
188            'unpack_bootimg',
189            '--boot_img', self._bootimg,
190            '--out', self._bootimg_dir,
191            '--format=mkbootimg',
192        ]
193        result = subprocess.run(unpack_bootimg_cmds, check=True,
194                                capture_output=True, encoding='utf-8')
195        self._previous_mkbootimg_args = shlex.split(result.stdout)
196        print("=== Unpacked boot image: '{}' ===".format(self._bootimg))
197
198        # From the output dir, checks there is 'ramdisk' or 'vendor_ramdisk'.
199        ramdisk = os.path.join(self._bootimg_dir, 'ramdisk')
200        vendor_ramdisk = os.path.join(self._bootimg_dir, 'vendor_ramdisk')
201        vendor_ramdisks = self._get_vendor_ramdisks()
202        if os.path.exists(ramdisk):
203            self._ramdisk = RamdiskImage(ramdisk)
204            self._bootimg_type = BootImageType.BOOT_IMAGE
205        elif os.path.exists(vendor_ramdisk):
206            self._ramdisk = RamdiskImage(vendor_ramdisk)
207            self._bootimg_type = BootImageType.VENDOR_BOOT_IMAGE
208        elif len(vendor_ramdisks) == 1:
209            self._ramdisk = RamdiskImage(vendor_ramdisks[0])
210            self._bootimg_type = BootImageType.SINGLE_RAMDISK_FRAGMENT
211        elif len(vendor_ramdisks) > 1:
212            # Creates an empty RamdiskImage() below, without unpack.
213            # We'll then add files into this newly created ramdisk, then pack
214            # it with other vendor ramdisks together.
215            self._ramdisk = RamdiskImage(ramdisk_img=None, unpack=False)
216            self._bootimg_type = BootImageType.MULTIPLE_RAMDISK_FRAGMENTS
217        else:
218            raise RuntimeError('Both ramdisk and vendor_ramdisk do not exist.')
219
220    def repack_bootimg(self):
221        """Repacks the ramdisk and rebuild the boot.img"""
222
223        new_ramdisk = self._temp_file_manager.make_temp_file(
224            prefix='ramdisk-patched')
225        self._ramdisk.repack_ramdisk(new_ramdisk)
226
227        mkbootimg_cmd = ['mkbootimg']
228
229        # Uses previous mkbootimg args, e.g., --vendor_cmdline, --dtb_offset.
230        mkbootimg_cmd.extend(self._previous_mkbootimg_args)
231
232        ramdisk_option = ''
233        if self._bootimg_type == BootImageType.BOOT_IMAGE:
234            ramdisk_option = '--ramdisk'
235            mkbootimg_cmd.extend(['--output', self._bootimg])
236        elif self._bootimg_type == BootImageType.VENDOR_BOOT_IMAGE:
237            ramdisk_option = '--vendor_ramdisk'
238            mkbootimg_cmd.extend(['--vendor_boot', self._bootimg])
239        elif self._bootimg_type == BootImageType.SINGLE_RAMDISK_FRAGMENT:
240            ramdisk_option = '--vendor_ramdisk_fragment'
241            mkbootimg_cmd.extend(['--vendor_boot', self._bootimg])
242        elif self._bootimg_type == BootImageType.MULTIPLE_RAMDISK_FRAGMENTS:
243            mkbootimg_cmd.extend(['--ramdisk_type', 'PLATFORM'])
244            ramdisk_name = (
245                'RAMDISK_' +
246                datetime.datetime.now().strftime('%Y-%m-%d_%H:%M:%S'))
247            mkbootimg_cmd.extend(['--ramdisk_name', ramdisk_name])
248            mkbootimg_cmd.extend(['--vendor_ramdisk_fragment', new_ramdisk])
249            mkbootimg_cmd.extend(['--vendor_boot', self._bootimg])
250
251        if ramdisk_option and ramdisk_option not in mkbootimg_cmd:
252            raise RuntimeError("Failed to find '{}' from:\n  {}".format(
253                ramdisk_option, shlex.join(mkbootimg_cmd)))
254        # Replaces the original ramdisk with the newly packed ramdisk.
255        if ramdisk_option:
256            ramdisk_index = mkbootimg_cmd.index(ramdisk_option) + 1
257            mkbootimg_cmd[ramdisk_index] = new_ramdisk
258
259        subprocess.check_call(mkbootimg_cmd)
260        print("=== Repacked boot image: '{}' ===".format(self._bootimg))
261
262    def add_files(self, copy_pairs):
263        """Copy files specified by copy_pairs into current ramdisk.
264
265        Args:
266            copy_pairs: a list of (src_pathname, dst_file) pairs.
267        """
268        # Creates missing parent dirs with 0o755.
269        original_mask = os.umask(0o022)
270        for src_pathname, dst_file in copy_pairs:
271            dst_pathname = os.path.join(self.ramdisk_dir, dst_file)
272            dst_dir = os.path.dirname(dst_pathname)
273            if not os.path.exists(dst_dir):
274                print("Creating dir '{}'".format(dst_dir))
275                os.makedirs(dst_dir, 0o755)
276            print(f"Copying file '{src_pathname}' to '{dst_pathname}'")
277            shutil.copy2(src_pathname, dst_pathname, follow_symlinks=False)
278        os.umask(original_mask)
279
280    @property
281    def ramdisk_dir(self):
282        """Returns the internal ramdisk dir."""
283        return self._ramdisk.ramdisk_dir
284
285
286def _get_repack_usage():
287    return """Usage examples:
288
289  * --ramdisk_add SRC_FILE:DST_FILE
290
291    If --local is given, copy SRC_FILE from the local filesystem to DST_FILE in
292    the ramdisk of --dst_bootimg.
293    If --src_bootimg is specified, copy SRC_FILE from the ramdisk of
294    --src_bootimg to DST_FILE in the ramdisk of --dst_bootimg.
295
296    Copies a local file 'userdebug_plat_sepolicy.cil' into the ramdisk of
297    --dst_bootimg, and then rebuild --dst_bootimg:
298
299    $ %(prog)s \\
300        --local --dst_bootimg vendor_boot-debug.img \\
301        --ramdisk_add userdebug_plat_sepolicy.cil:userdebug_plat_sepolicy.cil
302
303    Copies 'first_stage_ramdisk/userdebug_plat_sepolicy.cil' from the ramdisk
304    of --src_bootimg to 'userdebug_plat_sepolicy.cil' in the ramdisk of
305    --dst_bootimg, and then rebuild --dst_bootimg:
306
307    $ %(prog)s \\
308        --src_bootimg boot-debug-5.4.img --dst_bootimg vendor_boot-debug.img \\
309        --ramdisk_add first_stage_ramdisk/userdebug_plat_sepolicy.cil:userdebug_plat_sepolicy.cil
310
311    This option can be specified multiple times to copy multiple files:
312
313    $ %(prog)s \\
314        --local --dst_bootimg vendor_boot-debug.img \\
315        --ramdisk_add file1:path/in/dst_bootimg/file1 \\
316        --ramdisk_add file2:path/in/dst_bootimg/file2
317"""
318
319
320def _parse_args():
321    """Parse command-line options."""
322    parser = argparse.ArgumentParser(
323        formatter_class=argparse.RawDescriptionHelpFormatter,
324        description='Repacks boot, recovery or vendor_boot image by importing '
325                    'ramdisk files from --src_bootimg to --dst_bootimg.',
326        epilog=_get_repack_usage(),
327    )
328
329    src_group = parser.add_mutually_exclusive_group(required=True)
330    src_group.add_argument(
331        '--src_bootimg', help='filename to source boot image',
332        type=BootImage)
333    src_group.add_argument(
334        '--local', help='use local files as repack source',
335        action='store_true')
336
337    parser.add_argument(
338        '--dst_bootimg', help='filename to destination boot image',
339        type=BootImage, required=True)
340    parser.add_argument(
341        '--ramdisk_add', metavar='SRC_FILE:DST_FILE',
342        help='a copy pair to copy into the ramdisk of --dst_bootimg',
343        action='extend', nargs='+', required=True)
344
345    args = parser.parse_args()
346
347    # Parse args.ramdisk_add to a list of copy pairs.
348    if args.src_bootimg:
349        args.ramdisk_add = [
350            _parse_ramdisk_copy_pair(p, args.src_bootimg.ramdisk_dir)
351            for p in args.ramdisk_add
352        ]
353    else:
354        # Repack from local files.
355        args.ramdisk_add = [
356            _parse_ramdisk_copy_pair(p) for p in args.ramdisk_add
357        ]
358
359    return args
360
361
362def _parse_ramdisk_copy_pair(pair, src_ramdisk_dir=None):
363    """Parse a ramdisk copy pair argument."""
364    if ':' in pair:
365        src_file, dst_file = pair.split(':', maxsplit=1)
366    else:
367        src_file = dst_file = pair
368
369    # os.path.join() only works on relative path components.
370    # If a component is an absolute path, all previous components are thrown
371    # away and joining continues from the absolute path component.
372    # So make sure the file name is not absolute before calling os.path.join().
373    if src_ramdisk_dir:
374        if os.path.isabs(src_file):
375            raise ValueError('file name cannot be absolute when repacking from '
376                             'a ramdisk: ' + src_file)
377        src_pathname = os.path.join(src_ramdisk_dir, src_file)
378    else:
379        src_pathname = src_file
380    if os.path.isabs(dst_file):
381        raise ValueError('destination file name cannot be absolute: ' +
382                         dst_file)
383    return (src_pathname, dst_file)
384
385
386def main():
387    """Parse arguments and repack boot image."""
388    args = _parse_args()
389    args.dst_bootimg.add_files(args.ramdisk_add)
390    args.dst_bootimg.repack_bootimg()
391
392
393if __name__ == '__main__':
394    main()
395