1#
2# Copyright (C) 2018 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import copy
18import os
19import os.path
20import tempfile
21import zipfile
22
23import common
24import ota_metadata_pb2
25import test_utils
26from ota_utils import (
27    BuildLegacyOtaMetadata, CalculateRuntimeDevicesAndFingerprints,
28    ConstructOtaApexInfo, FinalizeMetadata, GetPackageMetadata, PropertyFiles, AbOtaPropertyFiles, PayloadGenerator, StreamingPropertyFiles)
29from ota_from_target_files import (
30    _LoadOemDicts,
31    GetTargetFilesZipForCustomImagesUpdates,
32    GetTargetFilesZipForPartialUpdates,
33    GetTargetFilesZipForSecondaryImages,
34    GetTargetFilesZipWithoutPostinstallConfig,
35    POSTINSTALL_CONFIG, AB_PARTITIONS)
36from apex_utils import GetApexInfoFromTargetFiles
37from test_utils import PropertyFilesTestCase
38from common import OPTIONS
39from payload_signer import PayloadSigner
40
41
42def construct_target_files(secondary=False, compressedApex=False):
43  """Returns a target-files.zip file for generating OTA packages."""
44  target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
45  with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
46    # META/update_engine_config.txt
47    target_files_zip.writestr(
48        'META/update_engine_config.txt',
49        "PAYLOAD_MAJOR_VERSION=2\nPAYLOAD_MINOR_VERSION=4\n")
50
51    # META/postinstall_config.txt
52    target_files_zip.writestr(
53        POSTINSTALL_CONFIG,
54        '\n'.join([
55            "RUN_POSTINSTALL_system=true",
56            "POSTINSTALL_PATH_system=system/bin/otapreopt_script",
57            "FILESYSTEM_TYPE_system=ext4",
58            "POSTINSTALL_OPTIONAL_system=true",
59        ]))
60
61    ab_partitions = [
62        ('IMAGES', 'boot'),
63        ('IMAGES', 'system'),
64        ('IMAGES', 'vendor'),
65        ('RADIO', 'bootloader'),
66        ('RADIO', 'modem'),
67    ]
68    # META/ab_partitions.txt
69    target_files_zip.writestr(
70        'META/ab_partitions.txt',
71        '\n'.join([partition[1] for partition in ab_partitions]))
72
73    # Create fake images for each of them.
74    for path, partition in ab_partitions:
75      target_files_zip.writestr(
76          '{}/{}.img'.format(path, partition),
77          os.urandom(len(partition)))
78
79    # system_other shouldn't appear in META/ab_partitions.txt.
80    if secondary:
81      target_files_zip.writestr('IMAGES/system_other.img',
82                                os.urandom(len("system_other")))
83
84    if compressedApex:
85      apex_file_name = 'com.android.apex.compressed.v1.capex'
86      apex_file = os.path.join(test_utils.get_current_dir(), apex_file_name)
87      target_files_zip.write(apex_file, 'SYSTEM/apex/' + apex_file_name)
88
89  return target_files
90
91
92class LoadOemDictsTest(test_utils.ReleaseToolsTestCase):
93
94  def test_NoneDict(self):
95    self.assertIsNone(_LoadOemDicts(None))
96
97  def test_SingleDict(self):
98    dict_file = common.MakeTempFile()
99    with open(dict_file, 'w') as dict_fp:
100      dict_fp.write('abc=1\ndef=2\nxyz=foo\na.b.c=bar\n')
101
102    oem_dicts = _LoadOemDicts([dict_file])
103    self.assertEqual(1, len(oem_dicts))
104    self.assertEqual('foo', oem_dicts[0]['xyz'])
105    self.assertEqual('bar', oem_dicts[0]['a.b.c'])
106
107  def test_MultipleDicts(self):
108    oem_source = []
109    for i in range(3):
110      dict_file = common.MakeTempFile()
111      with open(dict_file, 'w') as dict_fp:
112        dict_fp.write(
113            'ro.build.index={}\ndef=2\nxyz=foo\na.b.c=bar\n'.format(i))
114      oem_source.append(dict_file)
115
116    oem_dicts = _LoadOemDicts(oem_source)
117    self.assertEqual(3, len(oem_dicts))
118    for i, oem_dict in enumerate(oem_dicts):
119      self.assertEqual('2', oem_dict['def'])
120      self.assertEqual('foo', oem_dict['xyz'])
121      self.assertEqual('bar', oem_dict['a.b.c'])
122      self.assertEqual('{}'.format(i), oem_dict['ro.build.index'])
123
124
125class OtaFromTargetFilesTest(test_utils.ReleaseToolsTestCase):
126  TEST_TARGET_INFO_DICT = {
127      'build.prop': common.PartitionBuildProps.FromDictionary(
128          'system', {
129              'ro.product.device': 'product-device',
130              'ro.build.fingerprint': 'build-fingerprint-target',
131              'ro.build.version.incremental': 'build-version-incremental-target',
132              'ro.build.version.sdk': '27',
133              'ro.build.version.security_patch': '2017-12-01',
134              'ro.build.date.utc': '1500000000'}
135      )
136  }
137
138  TEST_SOURCE_INFO_DICT = {
139      'build.prop': common.PartitionBuildProps.FromDictionary(
140          'system', {
141              'ro.product.device': 'product-device',
142              'ro.build.fingerprint': 'build-fingerprint-source',
143              'ro.build.version.incremental': 'build-version-incremental-source',
144              'ro.build.version.sdk': '25',
145              'ro.build.version.security_patch': '2016-12-01',
146              'ro.build.date.utc': '1400000000'}
147      )
148  }
149
150  TEST_INFO_DICT_USES_OEM_PROPS = {
151      'build.prop': common.PartitionBuildProps.FromDictionary(
152          'system', {
153              'ro.product.name': 'product-name',
154              'ro.build.thumbprint': 'build-thumbprint',
155              'ro.build.bar': 'build-bar'}
156      ),
157      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
158          'vendor', {
159              'ro.vendor.build.fingerprint': 'vendor-build-fingerprint'}
160      ),
161      'property1': 'value1',
162      'property2': 4096,
163      'oem_fingerprint_properties': 'ro.product.device ro.product.brand',
164  }
165
166  TEST_TARGET_VENDOR_INFO_DICT = common.PartitionBuildProps.FromDictionary(
167    'vendor', {
168      'ro.vendor.build.date.utc' : '87654321',
169      'ro.product.vendor.device':'vendor-device',
170      'ro.vendor.build.fingerprint': 'build-fingerprint-vendor'}
171  )
172
173  TEST_SOURCE_VENDOR_INFO_DICT = common.PartitionBuildProps.FromDictionary(
174    'vendor', {
175      'ro.vendor.build.date.utc' : '12345678',
176      'ro.product.vendor.device':'vendor-device',
177      'ro.vendor.build.fingerprint': 'build-fingerprint-vendor'}
178  )
179
180  def setUp(self):
181    self.testdata_dir = test_utils.get_testdata_dir()
182    self.assertTrue(os.path.exists(self.testdata_dir))
183
184    # Reset the global options as in ota_from_target_files.py.
185    common.OPTIONS.incremental_source = None
186    common.OPTIONS.downgrade = False
187    common.OPTIONS.retrofit_dynamic_partitions = False
188    common.OPTIONS.timestamp = False
189    common.OPTIONS.wipe_user_data = False
190    common.OPTIONS.no_signing = False
191    common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
192    common.OPTIONS.key_passwords = {
193        common.OPTIONS.package_key: None,
194    }
195
196    common.OPTIONS.search_path = test_utils.get_search_path()
197
198  @staticmethod
199  def GetLegacyOtaMetadata(target_info, source_info=None):
200    metadata_proto = GetPackageMetadata(target_info, source_info)
201    return BuildLegacyOtaMetadata(metadata_proto)
202
203  def test_GetPackageMetadata_abOta_full(self):
204    target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
205    target_info_dict['ab_update'] = 'true'
206    target_info_dict['ab_partitions'] = []
207    target_info = common.BuildInfo(target_info_dict, None)
208    metadata = self.GetLegacyOtaMetadata(target_info)
209    self.assertDictEqual(
210        {
211            'ota-type': 'AB',
212            'ota-required-cache': '0',
213            'post-build': 'build-fingerprint-target',
214            'post-build-incremental': 'build-version-incremental-target',
215            'post-sdk-level': '27',
216            'post-security-patch-level': '2017-12-01',
217            'post-timestamp': '1500000000',
218            'pre-device': 'product-device',
219        },
220        metadata)
221
222  def test_GetPackageMetadata_abOta_incremental(self):
223    target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
224    target_info_dict['ab_update'] = 'true'
225    target_info_dict['ab_partitions'] = []
226    target_info = common.BuildInfo(target_info_dict, None)
227    source_info = common.BuildInfo(self.TEST_SOURCE_INFO_DICT, None)
228    common.OPTIONS.incremental_source = ''
229    metadata = self.GetLegacyOtaMetadata(target_info, source_info)
230    self.assertDictEqual(
231        {
232            'ota-type': 'AB',
233            'ota-required-cache': '0',
234            'post-build': 'build-fingerprint-target',
235            'post-build-incremental': 'build-version-incremental-target',
236            'post-sdk-level': '27',
237            'post-security-patch-level': '2017-12-01',
238            'post-timestamp': '1500000000',
239            'pre-device': 'product-device',
240            'pre-build': 'build-fingerprint-source',
241            'pre-build-incremental': 'build-version-incremental-source',
242        },
243        metadata)
244
245  def test_GetPackageMetadata_nonAbOta_full(self):
246    target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
247    metadata = self.GetLegacyOtaMetadata(target_info)
248    self.assertDictEqual(
249        {
250            'ota-type': 'BLOCK',
251            'ota-required-cache': '0',
252            'post-build': 'build-fingerprint-target',
253            'post-build-incremental': 'build-version-incremental-target',
254            'post-sdk-level': '27',
255            'post-security-patch-level': '2017-12-01',
256            'post-timestamp': '1500000000',
257            'pre-device': 'product-device',
258        },
259        metadata)
260
261  def test_GetPackageMetadata_nonAbOta_incremental(self):
262    target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
263    source_info = common.BuildInfo(self.TEST_SOURCE_INFO_DICT, None)
264    common.OPTIONS.incremental_source = ''
265    metadata = self.GetLegacyOtaMetadata(target_info, source_info)
266    self.assertDictEqual(
267        {
268            'ota-type': 'BLOCK',
269            'ota-required-cache': '0',
270            'post-build': 'build-fingerprint-target',
271            'post-build-incremental': 'build-version-incremental-target',
272            'post-sdk-level': '27',
273            'post-security-patch-level': '2017-12-01',
274            'post-timestamp': '1500000000',
275            'pre-device': 'product-device',
276            'pre-build': 'build-fingerprint-source',
277            'pre-build-incremental': 'build-version-incremental-source',
278        },
279        metadata)
280
281  def test_GetPackageMetadata_wipe(self):
282    target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
283    common.OPTIONS.wipe_user_data = True
284    metadata = self.GetLegacyOtaMetadata(target_info)
285    self.assertDictEqual(
286        {
287            'ota-type': 'BLOCK',
288            'ota-required-cache': '0',
289            'ota-wipe': 'yes',
290            'post-build': 'build-fingerprint-target',
291            'post-build-incremental': 'build-version-incremental-target',
292            'post-sdk-level': '27',
293            'post-security-patch-level': '2017-12-01',
294            'post-timestamp': '1500000000',
295            'pre-device': 'product-device',
296        },
297        metadata)
298
299  @test_utils.SkipIfExternalToolsUnavailable()
300  def test_GetApexInfoFromTargetFiles(self):
301    target_files = construct_target_files(compressedApex=True)
302    apex_infos = GetApexInfoFromTargetFiles(target_files)
303    self.assertEqual(len(apex_infos), 1)
304    self.assertEqual(apex_infos[0].package_name, "com.android.apex.compressed")
305    self.assertEqual(apex_infos[0].version, 1)
306    self.assertEqual(apex_infos[0].is_compressed, True)
307    # Compare the decompressed APEX size with the original uncompressed APEX
308    original_apex_name = 'com.android.apex.compressed.v1_original.apex'
309    original_apex_filepath = os.path.join(
310        test_utils.get_current_dir(), original_apex_name)
311    uncompressed_apex_size = os.path.getsize(original_apex_filepath)
312    self.assertEqual(apex_infos[0].decompressed_size, uncompressed_apex_size)
313
314  @staticmethod
315  def construct_tf_with_apex_info(infos):
316    apex_metadata_proto = ota_metadata_pb2.ApexMetadata()
317    apex_metadata_proto.apex_info.extend(infos)
318
319    output = common.MakeTempFile(suffix='.zip')
320    with zipfile.ZipFile(output, 'w') as zfp:
321      common.ZipWriteStr(zfp, "META/apex_info.pb",
322                         apex_metadata_proto.SerializeToString())
323    return output
324
325  def test_ConstructOtaApexInfo_incremental_package(self):
326    infos = [ota_metadata_pb2.ApexInfo(package_name='com.android.apex.1',
327                                       version=1000, is_compressed=False),
328             ota_metadata_pb2.ApexInfo(package_name='com.android.apex.2',
329                                       version=2000, is_compressed=True)]
330    target_file = self.construct_tf_with_apex_info(infos)
331
332    with zipfile.ZipFile(target_file) as target_zip:
333      info_bytes = ConstructOtaApexInfo(target_zip, source_file=target_file)
334    apex_metadata_proto = ota_metadata_pb2.ApexMetadata()
335    apex_metadata_proto.ParseFromString(info_bytes)
336
337    info_list = apex_metadata_proto.apex_info
338    self.assertEqual(2, len(info_list))
339    self.assertEqual('com.android.apex.1', info_list[0].package_name)
340    self.assertEqual(1000, info_list[0].version)
341    self.assertEqual(1000, info_list[0].source_version)
342
343  def test_GetPackageMetadata_retrofitDynamicPartitions(self):
344    target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
345    common.OPTIONS.retrofit_dynamic_partitions = True
346    metadata = self.GetLegacyOtaMetadata(target_info)
347    self.assertDictEqual(
348        {
349            'ota-retrofit-dynamic-partitions': 'yes',
350            'ota-type': 'BLOCK',
351            'ota-required-cache': '0',
352            'post-build': 'build-fingerprint-target',
353            'post-build-incremental': 'build-version-incremental-target',
354            'post-sdk-level': '27',
355            'post-security-patch-level': '2017-12-01',
356            'post-timestamp': '1500000000',
357            'pre-device': 'product-device',
358        },
359        metadata)
360
361  @staticmethod
362  def _test_GetPackageMetadata_swapBuildTimestamps(target_info, source_info):
363    (target_info['build.prop'].build_props['ro.build.date.utc'],
364     source_info['build.prop'].build_props['ro.build.date.utc']) = (
365         source_info['build.prop'].build_props['ro.build.date.utc'],
366         target_info['build.prop'].build_props['ro.build.date.utc'])
367
368  @staticmethod
369  def _test_GetPackageMetadata_swapVendorBuildTimestamps(target_info, source_info):
370    (target_info['vendor.build.prop'].build_props['ro.vendor.build.date.utc'],
371     source_info['vendor.build.prop'].build_props['ro.vendor.build.date.utc']) = (
372         source_info['vendor.build.prop'].build_props['ro.vendor.build.date.utc'],
373         target_info['vendor.build.prop'].build_props['ro.vendor.build.date.utc'])
374
375  def test_GetPackageMetadata_unintentionalDowngradeDetected(self):
376    target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
377    source_info_dict = copy.deepcopy(self.TEST_SOURCE_INFO_DICT)
378    self._test_GetPackageMetadata_swapBuildTimestamps(
379        target_info_dict, source_info_dict)
380
381    target_info = common.BuildInfo(target_info_dict, None)
382    source_info = common.BuildInfo(source_info_dict, None)
383    common.OPTIONS.incremental_source = ''
384    self.assertRaises(RuntimeError, self.GetLegacyOtaMetadata, target_info,
385                      source_info)
386
387  def test_GetPackageMetadata_unintentionalVendorDowngradeDetected(self):
388    target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
389    target_info_dict['ab_update'] = 'true'
390    target_info_dict['ab_partitions'] = ['vendor']
391    target_info_dict["vendor.build.prop"] = copy.deepcopy(self.TEST_TARGET_VENDOR_INFO_DICT)
392    source_info_dict = copy.deepcopy(self.TEST_SOURCE_INFO_DICT)
393    source_info_dict['ab_update'] = 'true'
394    source_info_dict['ab_partitions'] = ['vendor']
395    source_info_dict["vendor.build.prop"] = copy.deepcopy(self.TEST_SOURCE_VENDOR_INFO_DICT)
396    self._test_GetPackageMetadata_swapVendorBuildTimestamps(
397        target_info_dict, source_info_dict)
398
399    target_info = common.BuildInfo(target_info_dict, None)
400    source_info = common.BuildInfo(source_info_dict, None)
401    common.OPTIONS.incremental_source = ''
402    self.assertRaises(RuntimeError, self.GetLegacyOtaMetadata, target_info,
403                      source_info)
404
405  def test_GetPackageMetadata_downgrade(self):
406    target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
407    source_info_dict = copy.deepcopy(self.TEST_SOURCE_INFO_DICT)
408    self._test_GetPackageMetadata_swapBuildTimestamps(
409        target_info_dict, source_info_dict)
410
411    target_info = common.BuildInfo(target_info_dict, None)
412    source_info = common.BuildInfo(source_info_dict, None)
413    common.OPTIONS.incremental_source = ''
414    common.OPTIONS.downgrade = True
415    common.OPTIONS.wipe_user_data = True
416    common.OPTIONS.spl_downgrade = True
417    metadata = self.GetLegacyOtaMetadata(target_info, source_info)
418    # Reset spl_downgrade so other tests are unaffected
419    common.OPTIONS.spl_downgrade = False
420
421    self.assertDictEqual(
422        {
423            'ota-downgrade': 'yes',
424            'ota-type': 'BLOCK',
425            'ota-required-cache': '0',
426            'ota-wipe': 'yes',
427            'post-build': 'build-fingerprint-target',
428            'post-build-incremental': 'build-version-incremental-target',
429            'post-sdk-level': '27',
430            'post-security-patch-level': '2017-12-01',
431            'post-timestamp': '1400000000',
432            'pre-device': 'product-device',
433            'pre-build': 'build-fingerprint-source',
434            'pre-build-incremental': 'build-version-incremental-source',
435            'spl-downgrade': 'yes',
436        },
437        metadata)
438
439  def test_GetPackageMetadata_vendorDowngrade(self):
440    target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
441    target_info_dict['ab_update'] = 'true'
442    target_info_dict['ab_partitions'] = ['vendor']
443    target_info_dict["vendor.build.prop"] = copy.deepcopy(self.TEST_TARGET_VENDOR_INFO_DICT)
444    source_info_dict = copy.deepcopy(self.TEST_SOURCE_INFO_DICT)
445    source_info_dict['ab_update'] = 'true'
446    source_info_dict['ab_partitions'] = ['vendor']
447    source_info_dict["vendor.build.prop"] = copy.deepcopy(self.TEST_SOURCE_VENDOR_INFO_DICT)
448    self._test_GetPackageMetadata_swapVendorBuildTimestamps(
449        target_info_dict, source_info_dict)
450
451    target_info = common.BuildInfo(target_info_dict, None)
452    source_info = common.BuildInfo(source_info_dict, None)
453    common.OPTIONS.incremental_source = ''
454    common.OPTIONS.downgrade = True
455    common.OPTIONS.wipe_user_data = True
456    common.OPTIONS.spl_downgrade = True
457    metadata = self.GetLegacyOtaMetadata(target_info, source_info)
458    # Reset spl_downgrade so other tests are unaffected
459    common.OPTIONS.spl_downgrade = False
460
461    self.assertDictEqual(
462        {
463            'ota-downgrade': 'yes',
464            'ota-type': 'AB',
465            'ota-required-cache': '0',
466            'ota-wipe': 'yes',
467            'post-build': 'build-fingerprint-target',
468            'post-build-incremental': 'build-version-incremental-target',
469            'post-sdk-level': '27',
470            'post-security-patch-level': '2017-12-01',
471            'post-timestamp': '1500000000',
472            'pre-device': 'product-device',
473            'pre-build': 'build-fingerprint-source',
474            'pre-build-incremental': 'build-version-incremental-source',
475            'spl-downgrade': 'yes',
476        },
477        metadata)
478
479    post_build = GetPackageMetadata(target_info, source_info).postcondition
480    self.assertEqual('vendor', post_build.partition_state[0].partition_name)
481    self.assertEqual('12345678', post_build.partition_state[0].version)
482
483    pre_build = GetPackageMetadata(target_info, source_info).precondition
484    self.assertEqual('vendor', pre_build.partition_state[0].partition_name)
485    self.assertEqual('87654321', pre_build.partition_state[0].version)
486
487
488  @test_utils.SkipIfExternalToolsUnavailable()
489  def test_GetTargetFilesZipForSecondaryImages(self):
490    input_file = construct_target_files(secondary=True)
491    target_file = GetTargetFilesZipForSecondaryImages(input_file)
492
493    with zipfile.ZipFile(target_file) as verify_zip:
494      namelist = verify_zip.namelist()
495      ab_partitions = verify_zip.read('META/ab_partitions.txt').decode()
496
497    self.assertIn('META/ab_partitions.txt', namelist)
498    self.assertIn('IMAGES/system.img', namelist)
499    self.assertIn('RADIO/bootloader.img', namelist)
500    self.assertIn(POSTINSTALL_CONFIG, namelist)
501
502    self.assertNotIn('IMAGES/boot.img', namelist)
503    self.assertNotIn('IMAGES/system_other.img', namelist)
504    self.assertNotIn('IMAGES/system.map', namelist)
505    self.assertNotIn('RADIO/modem.img', namelist)
506
507    expected_ab_partitions = ['system', 'bootloader']
508    self.assertEqual('\n'.join(expected_ab_partitions), ab_partitions)
509
510  @test_utils.SkipIfExternalToolsUnavailable()
511  def test_GetTargetFilesZipForSecondaryImages_skipPostinstall(self):
512    input_file = construct_target_files(secondary=True)
513    target_file = GetTargetFilesZipForSecondaryImages(
514        input_file, skip_postinstall=True)
515
516    with zipfile.ZipFile(target_file) as verify_zip:
517      namelist = verify_zip.namelist()
518
519    self.assertIn('META/ab_partitions.txt', namelist)
520    self.assertIn('IMAGES/system.img', namelist)
521    self.assertIn('RADIO/bootloader.img', namelist)
522
523    self.assertNotIn('IMAGES/boot.img', namelist)
524    self.assertNotIn('IMAGES/system_other.img', namelist)
525    self.assertNotIn('IMAGES/system.map', namelist)
526    self.assertNotIn('RADIO/modem.img', namelist)
527    self.assertNotIn(POSTINSTALL_CONFIG, namelist)
528
529  @test_utils.SkipIfExternalToolsUnavailable()
530  def test_GetTargetFilesZipForSecondaryImages_withoutRadioImages(self):
531    input_file = construct_target_files(secondary=True)
532    common.ZipDelete(input_file, 'RADIO/bootloader.img')
533    common.ZipDelete(input_file, 'RADIO/modem.img')
534    target_file = GetTargetFilesZipForSecondaryImages(input_file)
535
536    with zipfile.ZipFile(target_file) as verify_zip:
537      namelist = verify_zip.namelist()
538
539    self.assertIn('META/ab_partitions.txt', namelist)
540    self.assertIn('IMAGES/system.img', namelist)
541    self.assertIn(POSTINSTALL_CONFIG, namelist)
542
543    self.assertNotIn('IMAGES/boot.img', namelist)
544    self.assertNotIn('IMAGES/system_other.img', namelist)
545    self.assertNotIn('IMAGES/system.map', namelist)
546    self.assertNotIn('RADIO/bootloader.img', namelist)
547    self.assertNotIn('RADIO/modem.img', namelist)
548
549  @test_utils.SkipIfExternalToolsUnavailable()
550  def test_GetTargetFilesZipForSecondaryImages_dynamicPartitions(self):
551    input_file = construct_target_files(secondary=True)
552    misc_info = '\n'.join([
553        'use_dynamic_partition_size=true',
554        'use_dynamic_partitions=true',
555        'dynamic_partition_list=system vendor product',
556        'super_partition_groups=google_dynamic_partitions',
557        'super_google_dynamic_partitions_group_size=4873781248',
558        'super_google_dynamic_partitions_partition_list=system vendor product',
559    ])
560    dynamic_partitions_info = '\n'.join([
561        'super_partition_groups=google_dynamic_partitions',
562        'super_google_dynamic_partitions_group_size=4873781248',
563        'super_google_dynamic_partitions_partition_list=system vendor product',
564    ])
565
566    with zipfile.ZipFile(input_file, 'a', allowZip64=True) as append_zip:
567      common.ZipWriteStr(append_zip, 'META/misc_info.txt', misc_info)
568      common.ZipWriteStr(append_zip, 'META/dynamic_partitions_info.txt',
569                         dynamic_partitions_info)
570
571    target_file = GetTargetFilesZipForSecondaryImages(input_file)
572
573    with zipfile.ZipFile(target_file) as verify_zip:
574      namelist = verify_zip.namelist()
575      updated_misc_info = verify_zip.read('META/misc_info.txt').decode()
576      updated_dynamic_partitions_info = verify_zip.read(
577          'META/dynamic_partitions_info.txt').decode()
578
579    self.assertIn('META/ab_partitions.txt', namelist)
580    self.assertIn('IMAGES/system.img', namelist)
581    self.assertIn(POSTINSTALL_CONFIG, namelist)
582    self.assertIn('META/misc_info.txt', namelist)
583    self.assertIn('META/dynamic_partitions_info.txt', namelist)
584
585    self.assertNotIn('IMAGES/boot.img', namelist)
586    self.assertNotIn('IMAGES/system_other.img', namelist)
587    self.assertNotIn('IMAGES/system.map', namelist)
588
589    # Check the vendor & product are removed from the partitions list.
590    expected_misc_info = misc_info.replace('system vendor product',
591                                           'system')
592    expected_dynamic_partitions_info = dynamic_partitions_info.replace(
593        'system vendor product', 'system')
594    self.assertEqual(expected_misc_info, updated_misc_info)
595    self.assertEqual(expected_dynamic_partitions_info,
596                     updated_dynamic_partitions_info)
597
598  @test_utils.SkipIfExternalToolsUnavailable()
599  def test_GetTargetFilesZipForPartialUpdates_singlePartition(self):
600    input_file = construct_target_files()
601    with zipfile.ZipFile(input_file, 'a', allowZip64=True) as append_zip:
602      common.ZipWriteStr(append_zip, 'IMAGES/system.map', 'fake map')
603
604    target_file = GetTargetFilesZipForPartialUpdates(input_file, ['system'])
605    with zipfile.ZipFile(target_file) as verify_zip:
606      namelist = verify_zip.namelist()
607      ab_partitions = verify_zip.read('META/ab_partitions.txt').decode()
608
609    self.assertIn('META/ab_partitions.txt', namelist)
610    self.assertIn('META/update_engine_config.txt', namelist)
611    self.assertIn('IMAGES/system.img', namelist)
612    self.assertIn('IMAGES/system.map', namelist)
613
614    self.assertNotIn('IMAGES/boot.img', namelist)
615    self.assertNotIn('IMAGES/system_other.img', namelist)
616    self.assertNotIn('RADIO/bootloader.img', namelist)
617    self.assertNotIn('RADIO/modem.img', namelist)
618
619    self.assertEqual('system', ab_partitions)
620
621  @test_utils.SkipIfExternalToolsUnavailable()
622  def test_GetTargetFilesZipForPartialUpdates_unrecognizedPartition(self):
623    input_file = construct_target_files()
624    self.assertRaises(ValueError, GetTargetFilesZipForPartialUpdates,
625                      input_file, ['product'])
626
627  @test_utils.SkipIfExternalToolsUnavailable()
628  def test_GetTargetFilesZipForPartialUpdates_dynamicPartitions(self):
629    input_file = construct_target_files(secondary=True)
630    misc_info = '\n'.join([
631        'use_dynamic_partition_size=true',
632        'use_dynamic_partitions=true',
633        'dynamic_partition_list=system vendor product',
634        'super_partition_groups=google_dynamic_partitions',
635        'super_google_dynamic_partitions_group_size=4873781248',
636        'super_google_dynamic_partitions_partition_list=system vendor product',
637    ])
638    dynamic_partitions_info = '\n'.join([
639        'super_partition_groups=google_dynamic_partitions',
640        'super_google_dynamic_partitions_group_size=4873781248',
641        'super_google_dynamic_partitions_partition_list=system vendor product',
642    ])
643
644    with zipfile.ZipFile(input_file, 'a', allowZip64=True) as append_zip:
645      common.ZipWriteStr(append_zip, 'META/misc_info.txt', misc_info)
646      common.ZipWriteStr(append_zip, 'META/dynamic_partitions_info.txt',
647                         dynamic_partitions_info)
648
649    target_file = GetTargetFilesZipForPartialUpdates(input_file,
650                                                     ['boot', 'system'])
651    with zipfile.ZipFile(target_file) as verify_zip:
652      namelist = verify_zip.namelist()
653      ab_partitions = verify_zip.read('META/ab_partitions.txt').decode()
654      updated_misc_info = verify_zip.read('META/misc_info.txt').decode()
655      updated_dynamic_partitions_info = verify_zip.read(
656          'META/dynamic_partitions_info.txt').decode()
657
658    self.assertIn('META/ab_partitions.txt', namelist)
659    self.assertIn('IMAGES/boot.img', namelist)
660    self.assertIn('IMAGES/system.img', namelist)
661    self.assertIn('META/misc_info.txt', namelist)
662    self.assertIn('META/dynamic_partitions_info.txt', namelist)
663
664    self.assertNotIn('IMAGES/system_other.img', namelist)
665    self.assertNotIn('RADIO/bootloader.img', namelist)
666    self.assertNotIn('RADIO/modem.img', namelist)
667
668    # Check the vendor & product are removed from the partitions list.
669    expected_misc_info = misc_info.replace('system vendor product',
670                                           'system')
671    expected_dynamic_partitions_info = dynamic_partitions_info.replace(
672        'system vendor product', 'system')
673    self.assertEqual(expected_misc_info, updated_misc_info)
674    self.assertEqual(expected_dynamic_partitions_info,
675                     updated_dynamic_partitions_info)
676    self.assertEqual('boot\nsystem', ab_partitions)
677
678  @test_utils.SkipIfExternalToolsUnavailable()
679  def test_GetTargetFilesZipWithoutPostinstallConfig(self):
680    input_file = construct_target_files()
681    target_file = GetTargetFilesZipWithoutPostinstallConfig(input_file)
682    with zipfile.ZipFile(target_file) as verify_zip:
683      self.assertNotIn(POSTINSTALL_CONFIG, verify_zip.namelist())
684
685  @test_utils.SkipIfExternalToolsUnavailable()
686  def test_GetTargetFilesZipWithoutPostinstallConfig_missingEntry(self):
687    input_file = construct_target_files()
688    common.ZipDelete(input_file, POSTINSTALL_CONFIG)
689    target_file = GetTargetFilesZipWithoutPostinstallConfig(input_file)
690    with zipfile.ZipFile(target_file) as verify_zip:
691      self.assertNotIn(POSTINSTALL_CONFIG, verify_zip.namelist())
692
693  @test_utils.SkipIfExternalToolsUnavailable()
694  def test_GetTargetFilesZipForCustomImagesUpdates_oemDefaultImage(self):
695    input_file = construct_target_files()
696    with zipfile.ZipFile(input_file, 'a', allowZip64=True) as append_zip:
697      common.ZipWriteStr(append_zip, 'IMAGES/oem.img', 'oem')
698      common.ZipWriteStr(append_zip, 'IMAGES/oem_test.img', 'oem_test')
699
700    target_file = GetTargetFilesZipForCustomImagesUpdates(
701        input_file, {'oem': 'oem.img'})
702
703    with zipfile.ZipFile(target_file) as verify_zip:
704      namelist = verify_zip.namelist()
705      ab_partitions = verify_zip.read('META/ab_partitions.txt').decode()
706      oem_image = verify_zip.read('IMAGES/oem.img').decode()
707
708    self.assertIn('META/ab_partitions.txt', namelist)
709    self.assertEqual('boot\nsystem\nvendor\nbootloader\nmodem', ab_partitions)
710    self.assertIn('IMAGES/oem.img', namelist)
711    self.assertEqual('oem', oem_image)
712
713  @test_utils.SkipIfExternalToolsUnavailable()
714  def test_GetTargetFilesZipForCustomImagesUpdates_oemTestImage(self):
715    input_file = construct_target_files()
716    with zipfile.ZipFile(input_file, 'a', allowZip64=True) as append_zip:
717      common.ZipWriteStr(append_zip, 'IMAGES/oem.img', 'oem')
718      common.ZipWriteStr(append_zip, 'IMAGES/oem_test.img', 'oem_test')
719
720    target_file = GetTargetFilesZipForCustomImagesUpdates(
721        input_file, {'oem': 'oem_test.img'})
722
723    with zipfile.ZipFile(target_file) as verify_zip:
724      namelist = verify_zip.namelist()
725      ab_partitions = verify_zip.read('META/ab_partitions.txt').decode()
726      oem_image = verify_zip.read('IMAGES/oem.img').decode()
727
728    self.assertIn('META/ab_partitions.txt', namelist)
729    self.assertEqual('boot\nsystem\nvendor\nbootloader\nmodem', ab_partitions)
730    self.assertIn('IMAGES/oem.img', namelist)
731    self.assertEqual('oem_test', oem_image)
732
733  def _test_FinalizeMetadata(self, large_entry=False):
734    entries = [
735        'required-entry1',
736        'required-entry2',
737    ]
738    zip_file = PropertyFilesTest.construct_zip_package(entries)
739    # Add a large entry of 1 GiB if requested.
740    if large_entry:
741      with zipfile.ZipFile(zip_file, 'a', allowZip64=True) as zip_fp:
742        zip_fp.writestr(
743            # Using 'zoo' so that the entry stays behind others after signing.
744            'zoo',
745            'A' * 1024 * 1024 * 1024,
746            zipfile.ZIP_STORED)
747
748    metadata = ota_metadata_pb2.OtaMetadata()
749    output_file = common.MakeTempFile(suffix='.zip')
750    needed_property_files = (
751        TestPropertyFiles(),
752    )
753    FinalizeMetadata(metadata, zip_file, output_file, needed_property_files)
754    self.assertIn('ota-test-property-files', metadata.property_files)
755
756  @test_utils.SkipIfExternalToolsUnavailable()
757  def test_FinalizeMetadata(self):
758    self._test_FinalizeMetadata()
759
760  @test_utils.SkipIfExternalToolsUnavailable()
761  def test_FinalizeMetadata_withNoSigning(self):
762    common.OPTIONS.no_signing = True
763    self._test_FinalizeMetadata()
764
765  @test_utils.SkipIfExternalToolsUnavailable()
766  def test_FinalizeMetadata_largeEntry(self):
767    self._test_FinalizeMetadata(large_entry=True)
768
769  @test_utils.SkipIfExternalToolsUnavailable()
770  def test_FinalizeMetadata_largeEntry_withNoSigning(self):
771    common.OPTIONS.no_signing = True
772    self._test_FinalizeMetadata(large_entry=True)
773
774  @test_utils.SkipIfExternalToolsUnavailable()
775  def test_FinalizeMetadata_insufficientSpace(self):
776    entries = [
777        'required-entry1',
778        'required-entry2',
779        'optional-entry1',
780        'optional-entry2',
781    ]
782    zip_file = PropertyFilesTest.construct_zip_package(entries)
783    with zipfile.ZipFile(zip_file, 'a', allowZip64=True) as zip_fp:
784      zip_fp.writestr(
785          # 'foo-entry1' will appear ahead of all other entries (in alphabetical
786          # order) after the signing, which will in turn trigger the
787          # InsufficientSpaceException and an automatic retry.
788          'foo-entry1',
789          'A' * 1024 * 1024,
790          zipfile.ZIP_STORED)
791
792    metadata = ota_metadata_pb2.OtaMetadata()
793    needed_property_files = (
794        TestPropertyFiles(),
795    )
796    output_file = common.MakeTempFile(suffix='.zip')
797    FinalizeMetadata(metadata, zip_file, output_file, needed_property_files)
798    self.assertIn('ota-test-property-files', metadata.property_files)
799
800
801class TestPropertyFiles(PropertyFiles):
802  """A class that extends PropertyFiles for testing purpose."""
803
804  def __init__(self):
805    super(TestPropertyFiles, self).__init__()
806    self.name = 'ota-test-property-files'
807    self.required = (
808        'required-entry1',
809        'required-entry2',
810    )
811    self.optional = (
812        'optional-entry1',
813        'optional-entry2',
814    )
815
816
817class PropertyFilesTest(PropertyFilesTestCase):
818
819  @test_utils.SkipIfExternalToolsUnavailable()
820  def test_Compute(self):
821    entries = (
822        'required-entry1',
823        'required-entry2',
824    )
825    zip_file = self.construct_zip_package(entries)
826    property_files = TestPropertyFiles()
827    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
828      property_files_string = property_files.Compute(zip_fp)
829
830    tokens = self._parse_property_files_string(property_files_string)
831    self.assertEqual(4, len(tokens))
832    self._verify_entries(zip_file, tokens, entries)
833
834  def test_Compute_withOptionalEntries(self):
835    entries = (
836        'required-entry1',
837        'required-entry2',
838        'optional-entry1',
839        'optional-entry2',
840    )
841    zip_file = self.construct_zip_package(entries)
842    property_files = TestPropertyFiles()
843    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
844      property_files_string = property_files.Compute(zip_fp)
845
846    tokens = self._parse_property_files_string(property_files_string)
847    self.assertEqual(6, len(tokens))
848    self._verify_entries(zip_file, tokens, entries)
849
850  def test_Compute_missingRequiredEntry(self):
851    entries = (
852        'required-entry2',
853    )
854    zip_file = self.construct_zip_package(entries)
855    property_files = TestPropertyFiles()
856    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
857      self.assertRaises(KeyError, property_files.Compute, zip_fp)
858
859  @test_utils.SkipIfExternalToolsUnavailable()
860  def test_Finalize(self):
861    entries = [
862        'required-entry1',
863        'required-entry2',
864        'META-INF/com/android/metadata',
865        'META-INF/com/android/metadata.pb',
866    ]
867    zip_file = self.construct_zip_package(entries)
868    property_files = TestPropertyFiles()
869    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
870      raw_metadata = property_files.GetPropertyFilesString(
871          zip_fp, reserve_space=False)
872      streaming_metadata = property_files.Finalize(zip_fp, len(raw_metadata))
873    tokens = self._parse_property_files_string(streaming_metadata)
874
875    self.assertEqual(4, len(tokens))
876    # 'META-INF/com/android/metadata' will be key'd as 'metadata' in the
877    # streaming metadata.
878    entries[2] = 'metadata'
879    entries[3] = 'metadata.pb'
880    self._verify_entries(zip_file, tokens, entries)
881
882  @test_utils.SkipIfExternalToolsUnavailable()
883  def test_Finalize_assertReservedLength(self):
884    entries = (
885        'required-entry1',
886        'required-entry2',
887        'optional-entry1',
888        'optional-entry2',
889        'META-INF/com/android/metadata',
890        'META-INF/com/android/metadata.pb',
891    )
892    zip_file = self.construct_zip_package(entries)
893    property_files = TestPropertyFiles()
894    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
895      # First get the raw metadata string (i.e. without padding space).
896      raw_metadata = property_files.GetPropertyFilesString(
897          zip_fp, reserve_space=False)
898      raw_length = len(raw_metadata)
899
900      # Now pass in the exact expected length.
901      streaming_metadata = property_files.Finalize(zip_fp, raw_length)
902      self.assertEqual(raw_length, len(streaming_metadata))
903
904      # Or pass in insufficient length.
905      self.assertRaises(
906          PropertyFiles.InsufficientSpaceException,
907          property_files.Finalize,
908          zip_fp,
909          raw_length - 1)
910
911      # Or pass in a much larger size.
912      streaming_metadata = property_files.Finalize(
913          zip_fp,
914          raw_length + 20)
915      self.assertEqual(raw_length + 20, len(streaming_metadata))
916      self.assertEqual(' ' * 20, streaming_metadata[raw_length:])
917
918  def test_Verify(self):
919    entries = (
920        'required-entry1',
921        'required-entry2',
922        'optional-entry1',
923        'optional-entry2',
924        'META-INF/com/android/metadata',
925        'META-INF/com/android/metadata.pb',
926    )
927    zip_file = self.construct_zip_package(entries)
928    property_files = TestPropertyFiles()
929    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
930      # First get the raw metadata string (i.e. without padding space).
931      raw_metadata = property_files.GetPropertyFilesString(
932          zip_fp, reserve_space=False)
933
934      # Should pass the test if verification passes.
935      property_files.Verify(zip_fp, raw_metadata)
936
937      # Or raise on verification failure.
938      self.assertRaises(
939          AssertionError, property_files.Verify, zip_fp, raw_metadata + 'x')
940
941
942class StreamingPropertyFilesTest(PropertyFilesTestCase):
943  """Additional validity checks specialized for StreamingPropertyFiles."""
944
945  def test_init(self):
946    property_files = StreamingPropertyFiles()
947    self.assertEqual('ota-streaming-property-files', property_files.name)
948    self.assertEqual(
949        (
950            'payload.bin',
951            'payload_properties.txt',
952        ),
953        property_files.required)
954    self.assertEqual(
955        (
956            'apex_info.pb',
957            'care_map.pb',
958            'care_map.txt',
959            'compatibility.zip',
960        ),
961        property_files.optional)
962
963  def test_Compute(self):
964    entries = (
965        'payload.bin',
966        'payload_properties.txt',
967        'care_map.txt',
968        'compatibility.zip',
969    )
970    zip_file = self.construct_zip_package(entries)
971    property_files = StreamingPropertyFiles()
972    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
973      property_files_string = property_files.Compute(zip_fp)
974
975    tokens = self._parse_property_files_string(property_files_string)
976    self.assertEqual(6, len(tokens))
977    self._verify_entries(zip_file, tokens, entries)
978
979  def test_Finalize(self):
980    entries = [
981        'payload.bin',
982        'payload_properties.txt',
983        'care_map.txt',
984        'compatibility.zip',
985        'META-INF/com/android/metadata',
986        'META-INF/com/android/metadata.pb',
987    ]
988    zip_file = self.construct_zip_package(entries)
989    property_files = StreamingPropertyFiles()
990    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
991      raw_metadata = property_files.GetPropertyFilesString(
992          zip_fp, reserve_space=False)
993      streaming_metadata = property_files.Finalize(zip_fp, len(raw_metadata))
994    tokens = self._parse_property_files_string(streaming_metadata)
995
996    self.assertEqual(6, len(tokens))
997    # 'META-INF/com/android/metadata' will be key'd as 'metadata' in the
998    # streaming metadata.
999    entries[4] = 'metadata'
1000    entries[5] = 'metadata.pb'
1001    self._verify_entries(zip_file, tokens, entries)
1002
1003  def test_Verify(self):
1004    entries = (
1005        'payload.bin',
1006        'payload_properties.txt',
1007        'care_map.txt',
1008        'compatibility.zip',
1009        'META-INF/com/android/metadata',
1010        'META-INF/com/android/metadata.pb',
1011    )
1012    zip_file = self.construct_zip_package(entries)
1013    property_files = StreamingPropertyFiles()
1014    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
1015      # First get the raw metadata string (i.e. without padding space).
1016      raw_metadata = property_files.GetPropertyFilesString(
1017          zip_fp, reserve_space=False)
1018
1019      # Should pass the test if verification passes.
1020      property_files.Verify(zip_fp, raw_metadata)
1021
1022      # Or raise on verification failure.
1023      self.assertRaises(
1024          AssertionError, property_files.Verify, zip_fp, raw_metadata + 'x')
1025
1026
1027class AbOtaPropertyFilesTest(PropertyFilesTestCase):
1028  """Additional validity checks specialized for AbOtaPropertyFiles."""
1029
1030  # The size for payload and metadata signature size.
1031  SIGNATURE_SIZE = 256
1032
1033  def setUp(self):
1034    self.testdata_dir = test_utils.get_testdata_dir()
1035    self.assertTrue(os.path.exists(self.testdata_dir))
1036
1037    common.OPTIONS.wipe_user_data = False
1038    common.OPTIONS.payload_signer = None
1039    common.OPTIONS.payload_signer_args = None
1040    common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
1041    common.OPTIONS.key_passwords = {
1042        common.OPTIONS.package_key: None,
1043    }
1044
1045  def test_init(self):
1046    property_files = AbOtaPropertyFiles()
1047    self.assertEqual('ota-property-files', property_files.name)
1048    self.assertEqual(
1049        (
1050            'payload.bin',
1051            'payload_properties.txt',
1052        ),
1053        property_files.required)
1054    self.assertEqual(
1055        (
1056            'apex_info.pb',
1057            'care_map.pb',
1058            'care_map.txt',
1059            'compatibility.zip',
1060        ),
1061        property_files.optional)
1062
1063  @test_utils.SkipIfExternalToolsUnavailable()
1064  def test_GetPayloadMetadataOffsetAndSize(self):
1065    target_file = construct_target_files()
1066    payload = PayloadGenerator()
1067    payload.Generate(target_file)
1068
1069    payload_signer = PayloadSigner()
1070    payload.Sign(payload_signer)
1071
1072    output_file = common.MakeTempFile(suffix='.zip')
1073    with zipfile.ZipFile(output_file, 'w', allowZip64=True) as output_zip:
1074      payload.WriteToZip(output_zip)
1075
1076    # Find out the payload metadata offset and size.
1077    property_files = AbOtaPropertyFiles()
1078    with zipfile.ZipFile(output_file) as input_zip:
1079      # pylint: disable=protected-access
1080      payload_offset, metadata_total = (
1081          property_files._GetPayloadMetadataOffsetAndSize(input_zip))
1082
1083    # The signature proto has the following format (details in
1084    #  /platform/system/update_engine/update_metadata.proto):
1085    #  message Signature {
1086    #    optional uint32 version = 1;
1087    #    optional bytes data = 2;
1088    #    optional fixed32 unpadded_signature_size = 3;
1089    #  }
1090    #
1091    # According to the protobuf encoding, the tail of the signature message will
1092    # be [signature string(256 bytes) + encoding of the fixed32 number 256]. And
1093    # 256 is encoded as 'x1d\x00\x01\x00\x00':
1094    # [3 (field number) << 3 | 5 (type) + byte reverse of 0x100 (256)].
1095    # Details in (https://developers.google.com/protocol-buffers/docs/encoding)
1096    signature_tail_length = self.SIGNATURE_SIZE + 5
1097    self.assertGreater(metadata_total, signature_tail_length)
1098    with open(output_file, 'rb') as verify_fp:
1099      verify_fp.seek(payload_offset + metadata_total - signature_tail_length)
1100      metadata_signature_proto_tail = verify_fp.read(signature_tail_length)
1101
1102    self.assertEqual(b'\x1d\x00\x01\x00\x00',
1103                     metadata_signature_proto_tail[-5:])
1104    metadata_signature = metadata_signature_proto_tail[:-5]
1105
1106    # Now we extract the metadata hash via brillo_update_payload script, which
1107    # will serve as the oracle result.
1108    payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
1109    metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
1110    cmd = ['brillo_update_payload', 'hash',
1111           '--unsigned_payload', payload.payload_file,
1112           '--signature_size', str(self.SIGNATURE_SIZE),
1113           '--metadata_hash_file', metadata_sig_file,
1114           '--payload_hash_file', payload_sig_file]
1115    proc = common.Run(cmd)
1116    stdoutdata, _ = proc.communicate()
1117    self.assertEqual(
1118        0, proc.returncode,
1119        'Failed to run brillo_update_payload:\n{}'.format(stdoutdata))
1120
1121    signed_metadata_sig_file = payload_signer.SignHashFile(metadata_sig_file)
1122
1123    # Finally we can compare the two signatures.
1124    with open(signed_metadata_sig_file, 'rb') as verify_fp:
1125      self.assertEqual(verify_fp.read(), metadata_signature)
1126
1127  @staticmethod
1128  def construct_zip_package_withValidPayload(with_metadata=False):
1129    # Cannot use construct_zip_package() since we need a "valid" payload.bin.
1130    target_file = construct_target_files()
1131    payload = PayloadGenerator()
1132    payload.Generate(target_file)
1133
1134    payload_signer = PayloadSigner()
1135    payload.Sign(payload_signer)
1136
1137    zip_file = common.MakeTempFile(suffix='.zip')
1138    with zipfile.ZipFile(zip_file, 'w', allowZip64=True) as zip_fp:
1139      # 'payload.bin',
1140      payload.WriteToZip(zip_fp)
1141
1142      # Other entries.
1143      entries = ['care_map.txt', 'compatibility.zip']
1144
1145      # Put META-INF/com/android/metadata if needed.
1146      if with_metadata:
1147        entries.append('META-INF/com/android/metadata')
1148        entries.append('META-INF/com/android/metadata.pb')
1149
1150      for entry in entries:
1151        zip_fp.writestr(
1152            entry, entry.replace('.', '-').upper(), zipfile.ZIP_STORED)
1153
1154    return zip_file
1155
1156  @test_utils.SkipIfExternalToolsUnavailable()
1157  def test_Compute(self):
1158    zip_file = self.construct_zip_package_withValidPayload()
1159    property_files = AbOtaPropertyFiles()
1160    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
1161      property_files_string = property_files.Compute(zip_fp)
1162
1163    tokens = self._parse_property_files_string(property_files_string)
1164    # "7" indcludes the four entries above, two metadata entries, and one entry
1165    # for payload-metadata.bin.
1166    self.assertEqual(7, len(tokens))
1167    self._verify_entries(
1168        zip_file, tokens, ('care_map.txt', 'compatibility.zip'))
1169
1170  @test_utils.SkipIfExternalToolsUnavailable()
1171  def test_Finalize(self):
1172    zip_file = self.construct_zip_package_withValidPayload(with_metadata=True)
1173    property_files = AbOtaPropertyFiles()
1174    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
1175      raw_metadata = property_files.GetPropertyFilesString(
1176          zip_fp, reserve_space=False)
1177      property_files_string = property_files.Finalize(
1178          zip_fp, len(raw_metadata))
1179
1180    tokens = self._parse_property_files_string(property_files_string)
1181    # "7" includes the four entries above, two metadata entries, and one entry
1182    # for payload-metadata.bin.
1183    self.assertEqual(7, len(tokens))
1184    self._verify_entries(
1185        zip_file, tokens, ('care_map.txt', 'compatibility.zip'))
1186
1187  @test_utils.SkipIfExternalToolsUnavailable()
1188  def test_Verify(self):
1189    zip_file = self.construct_zip_package_withValidPayload(with_metadata=True)
1190    property_files = AbOtaPropertyFiles()
1191    with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_fp:
1192      raw_metadata = property_files.GetPropertyFilesString(
1193          zip_fp, reserve_space=False)
1194
1195      property_files.Verify(zip_fp, raw_metadata)
1196
1197
1198class PayloadSignerTest(test_utils.ReleaseToolsTestCase):
1199
1200  SIGFILE = 'sigfile.bin'
1201  SIGNED_SIGFILE = 'signed-sigfile.bin'
1202
1203  def setUp(self):
1204    self.testdata_dir = test_utils.get_testdata_dir()
1205    self.assertTrue(os.path.exists(self.testdata_dir))
1206
1207    common.OPTIONS.payload_signer = None
1208    common.OPTIONS.payload_signer_args = []
1209    common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
1210    common.OPTIONS.key_passwords = {
1211        common.OPTIONS.package_key: None,
1212    }
1213
1214  def _assertFilesEqual(self, file1, file2):
1215    with open(file1, 'rb') as fp1, open(file2, 'rb') as fp2:
1216      self.assertEqual(fp1.read(), fp2.read())
1217
1218  @test_utils.SkipIfExternalToolsUnavailable()
1219  def test_init(self):
1220    payload_signer = PayloadSigner()
1221    self.assertEqual('openssl', payload_signer.signer)
1222    self.assertEqual(256, payload_signer.maximum_signature_size)
1223
1224  @test_utils.SkipIfExternalToolsUnavailable()
1225  def test_init_withPassword(self):
1226    common.OPTIONS.package_key = os.path.join(
1227        self.testdata_dir, 'testkey_with_passwd')
1228    common.OPTIONS.key_passwords = {
1229        common.OPTIONS.package_key: 'foo',
1230    }
1231    payload_signer = PayloadSigner()
1232    self.assertEqual('openssl', payload_signer.signer)
1233
1234  def test_init_withExternalSigner(self):
1235    common.OPTIONS.payload_signer_args = ['arg1', 'arg2']
1236    common.OPTIONS.payload_signer_maximum_signature_size = '512'
1237    payload_signer = PayloadSigner(
1238        OPTIONS.package_key, OPTIONS.private_key_suffix, payload_signer='abc')
1239    self.assertEqual('abc', payload_signer.signer)
1240    self.assertEqual(['arg1', 'arg2'], payload_signer.signer_args)
1241    self.assertEqual(512, payload_signer.maximum_signature_size)
1242
1243  @test_utils.SkipIfExternalToolsUnavailable()
1244  def test_GetMaximumSignatureSizeInBytes_512Bytes(self):
1245    signing_key = os.path.join(self.testdata_dir, 'testkey_RSA4096.key')
1246    # pylint: disable=protected-access
1247    signature_size = PayloadSigner._GetMaximumSignatureSizeInBytes(signing_key)
1248    self.assertEqual(512, signature_size)
1249
1250  @test_utils.SkipIfExternalToolsUnavailable()
1251  def test_GetMaximumSignatureSizeInBytes_ECKey(self):
1252    signing_key = os.path.join(self.testdata_dir, 'testkey_EC.key')
1253    # pylint: disable=protected-access
1254    signature_size = PayloadSigner._GetMaximumSignatureSizeInBytes(signing_key)
1255    self.assertEqual(72, signature_size)
1256
1257  @test_utils.SkipIfExternalToolsUnavailable()
1258  def test_Sign(self):
1259    payload_signer = PayloadSigner()
1260    input_file = os.path.join(self.testdata_dir, self.SIGFILE)
1261    signed_file = payload_signer.SignHashFile(input_file)
1262
1263    verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
1264    self._assertFilesEqual(verify_file, signed_file)
1265
1266  def test_Sign_withExternalSigner_openssl(self):
1267    """Uses openssl as the external payload signer."""
1268    common.OPTIONS.payload_signer_args = [
1269        'pkeyutl', '-sign', '-keyform', 'DER', '-inkey',
1270        os.path.join(self.testdata_dir, 'testkey.pk8'),
1271        '-pkeyopt', 'digest:sha256']
1272    payload_signer = PayloadSigner(
1273        OPTIONS.package_key, OPTIONS.private_key_suffix, payload_signer="openssl")
1274    input_file = os.path.join(self.testdata_dir, self.SIGFILE)
1275    signed_file = payload_signer.SignHashFile(input_file)
1276
1277    verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
1278    self._assertFilesEqual(verify_file, signed_file)
1279
1280  def test_Sign_withExternalSigner_script(self):
1281    """Uses testdata/payload_signer.sh as the external payload signer."""
1282    external_signer = os.path.join(
1283        self.testdata_dir, 'payload_signer.sh')
1284    os.chmod(external_signer, 0o700)
1285    common.OPTIONS.payload_signer_args = [
1286        os.path.join(self.testdata_dir, 'testkey.pk8')]
1287    payload_signer = PayloadSigner(
1288        OPTIONS.package_key, OPTIONS.private_key_suffix, payload_signer=external_signer)
1289    input_file = os.path.join(self.testdata_dir, self.SIGFILE)
1290    signed_file = payload_signer.SignHashFile(input_file)
1291
1292    verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
1293    self._assertFilesEqual(verify_file, signed_file)
1294
1295
1296class PayloadTest(test_utils.ReleaseToolsTestCase):
1297
1298  def setUp(self):
1299    self.testdata_dir = test_utils.get_testdata_dir()
1300    self.assertTrue(os.path.exists(self.testdata_dir))
1301
1302    common.OPTIONS.wipe_user_data = False
1303    common.OPTIONS.payload_signer = None
1304    common.OPTIONS.payload_signer_args = None
1305    common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
1306    common.OPTIONS.key_passwords = {
1307        common.OPTIONS.package_key: None,
1308    }
1309
1310  @staticmethod
1311  def _create_payload_full(secondary=False):
1312    target_file = construct_target_files(secondary)
1313    payload = PayloadGenerator(secondary, OPTIONS.wipe_user_data)
1314    payload.Generate(target_file)
1315    return payload
1316
1317  @staticmethod
1318  def _create_payload_incremental():
1319    target_file = construct_target_files()
1320    source_file = construct_target_files()
1321    payload = PayloadGenerator()
1322    payload.Generate(target_file, source_file)
1323    return payload
1324
1325  @test_utils.SkipIfExternalToolsUnavailable()
1326  def test_Generate_full(self):
1327    payload = self._create_payload_full()
1328    self.assertTrue(os.path.exists(payload.payload_file))
1329
1330  @test_utils.SkipIfExternalToolsUnavailable()
1331  def test_Generate_incremental(self):
1332    payload = self._create_payload_incremental()
1333    self.assertTrue(os.path.exists(payload.payload_file))
1334
1335  @test_utils.SkipIfExternalToolsUnavailable()
1336  def test_Generate_additionalArgs(self):
1337    target_file = construct_target_files()
1338    source_file = construct_target_files()
1339    payload = PayloadGenerator()
1340    # This should work the same as calling payload.Generate(target_file,
1341    # source_file).
1342    payload.Generate(
1343        target_file, additional_args=["--source_image", source_file])
1344    self.assertTrue(os.path.exists(payload.payload_file))
1345
1346  @test_utils.SkipIfExternalToolsUnavailable()
1347  def test_Generate_invalidInput(self):
1348    target_file = construct_target_files()
1349    common.ZipDelete(target_file, 'IMAGES/vendor.img')
1350    payload = PayloadGenerator()
1351    self.assertRaises(common.ExternalError, payload.Generate, target_file)
1352
1353  @test_utils.SkipIfExternalToolsUnavailable()
1354  def test_Sign_full(self):
1355    payload = self._create_payload_full()
1356    payload.Sign(PayloadSigner())
1357
1358    output_file = common.MakeTempFile(suffix='.zip')
1359    with zipfile.ZipFile(output_file, 'w', allowZip64=True) as output_zip:
1360      payload.WriteToZip(output_zip)
1361
1362    import check_ota_package_signature
1363    check_ota_package_signature.VerifyAbOtaPayload(
1364        os.path.join(self.testdata_dir, 'testkey.x509.pem'),
1365        output_file)
1366
1367  @test_utils.SkipIfExternalToolsUnavailable()
1368  def test_Sign_incremental(self):
1369    payload = self._create_payload_incremental()
1370    payload.Sign(PayloadSigner())
1371
1372    output_file = common.MakeTempFile(suffix='.zip')
1373    with zipfile.ZipFile(output_file, 'w', allowZip64=True) as output_zip:
1374      payload.WriteToZip(output_zip)
1375
1376    import check_ota_package_signature
1377    check_ota_package_signature.VerifyAbOtaPayload(
1378        os.path.join(self.testdata_dir, 'testkey.x509.pem'),
1379        output_file)
1380
1381  @test_utils.SkipIfExternalToolsUnavailable()
1382  def test_Sign_withDataWipe(self):
1383    common.OPTIONS.wipe_user_data = True
1384    payload = self._create_payload_full()
1385    payload.Sign(PayloadSigner())
1386    with tempfile.NamedTemporaryFile() as fp:
1387      with zipfile.ZipFile(fp, "w") as zfp:
1388        payload.WriteToZip(zfp)
1389
1390    with open(payload.payload_properties) as properties_fp:
1391      self.assertIn("POWERWASH=1", properties_fp.read())
1392
1393  @test_utils.SkipIfExternalToolsUnavailable()
1394  def test_Sign_secondary(self):
1395    payload = self._create_payload_full(secondary=True)
1396    payload.Sign(PayloadSigner())
1397    with tempfile.NamedTemporaryFile() as fp:
1398      with zipfile.ZipFile(fp, "w") as zfp:
1399        payload.WriteToZip(zfp)
1400
1401    with open(payload.payload_properties) as properties_fp:
1402      self.assertIn("SWITCH_SLOT_ON_REBOOT=0", properties_fp.read())
1403
1404  @test_utils.SkipIfExternalToolsUnavailable()
1405  def test_Sign_badSigner(self):
1406    """Tests that signing failure can be captured."""
1407    payload = self._create_payload_full()
1408    payload_signer = PayloadSigner()
1409    payload_signer.signer_args.append('bad-option')
1410    self.assertRaises(common.ExternalError, payload.Sign, payload_signer)
1411
1412  @test_utils.SkipIfExternalToolsUnavailable()
1413  def test_WriteToZip(self):
1414    payload = self._create_payload_full()
1415    payload.Sign(PayloadSigner())
1416
1417    output_file = common.MakeTempFile(suffix='.zip')
1418    with zipfile.ZipFile(output_file, 'w', allowZip64=True) as output_zip:
1419      payload.WriteToZip(output_zip)
1420
1421    with zipfile.ZipFile(output_file) as verify_zip:
1422      # First make sure we have the essential entries.
1423      namelist = verify_zip.namelist()
1424      self.assertIn(PayloadGenerator.PAYLOAD_BIN, namelist)
1425      self.assertIn(PayloadGenerator.PAYLOAD_PROPERTIES_TXT, namelist)
1426
1427      # Then assert these entries are stored.
1428      for entry_info in verify_zip.infolist():
1429        if entry_info.filename not in (PayloadGenerator.PAYLOAD_BIN,
1430                                       PayloadGenerator.PAYLOAD_PROPERTIES_TXT):
1431          continue
1432        self.assertEqual(zipfile.ZIP_STORED, entry_info.compress_type)
1433
1434  @test_utils.SkipIfExternalToolsUnavailable()
1435  def test_WriteToZip_secondary(self):
1436    payload = self._create_payload_full(secondary=True)
1437    payload.Sign(PayloadSigner())
1438
1439    output_file = common.MakeTempFile(suffix='.zip')
1440    with zipfile.ZipFile(output_file, 'w', allowZip64=True) as output_zip:
1441      payload.WriteToZip(output_zip)
1442
1443    with zipfile.ZipFile(output_file) as verify_zip:
1444      # First make sure we have the essential entries.
1445      namelist = verify_zip.namelist()
1446      self.assertIn(PayloadGenerator.SECONDARY_PAYLOAD_BIN, namelist)
1447      self.assertIn(PayloadGenerator.SECONDARY_PAYLOAD_PROPERTIES_TXT, namelist)
1448
1449      # Then assert these entries are stored.
1450      for entry_info in verify_zip.infolist():
1451        if entry_info.filename not in (
1452                PayloadGenerator.SECONDARY_PAYLOAD_BIN,
1453                PayloadGenerator.SECONDARY_PAYLOAD_PROPERTIES_TXT):
1454          continue
1455        self.assertEqual(zipfile.ZIP_STORED, entry_info.compress_type)
1456
1457
1458class RuntimeFingerprintTest(test_utils.ReleaseToolsTestCase):
1459  MISC_INFO = [
1460      'recovery_api_version=3',
1461      'fstab_version=2',
1462      'recovery_as_boot=true',
1463      'ab_update=true',
1464  ]
1465
1466  BUILD_PROP = [
1467      'ro.build.id=build-id',
1468      'ro.build.version.incremental=version-incremental',
1469      'ro.build.type=build-type',
1470      'ro.build.tags=build-tags',
1471      'ro.build.version.release=version-release',
1472      'ro.build.version.release_or_codename=version-release',
1473      'ro.build.version.sdk=30',
1474      'ro.build.version.security_patch=2020',
1475      'ro.build.date.utc=12345678',
1476      'ro.system.build.version.release=version-release',
1477      'ro.system.build.id=build-id',
1478      'ro.system.build.version.incremental=version-incremental',
1479      'ro.system.build.type=build-type',
1480      'ro.system.build.tags=build-tags',
1481      'ro.system.build.version.sdk=30',
1482      'ro.system.build.version.security_patch=2020',
1483      'ro.system.build.date.utc=12345678',
1484      'ro.product.system.brand=generic',
1485      'ro.product.system.name=generic',
1486      'ro.product.system.device=generic',
1487  ]
1488
1489  VENDOR_BUILD_PROP = [
1490      'ro.vendor.build.version.release=version-release',
1491      'ro.vendor.build.id=build-id',
1492      'ro.vendor.build.version.incremental=version-incremental',
1493      'ro.vendor.build.type=build-type',
1494      'ro.vendor.build.tags=build-tags',
1495      'ro.vendor.build.version.sdk=30',
1496      'ro.vendor.build.version.security_patch=2020',
1497      'ro.vendor.build.date.utc=12345678',
1498      'ro.product.vendor.brand=vendor-product-brand',
1499      'ro.product.vendor.name=vendor-product-name',
1500      'ro.product.vendor.device=vendor-product-device'
1501  ]
1502
1503  def setUp(self):
1504    common.OPTIONS.oem_dicts = None
1505    self.test_dir = common.MakeTempDir()
1506    self.writeFiles({'META/misc_info.txt': '\n'.join(self.MISC_INFO)},
1507                    self.test_dir)
1508
1509  def writeFiles(self, contents_dict, out_dir):
1510    for path, content in contents_dict.items():
1511      abs_path = os.path.join(out_dir, path)
1512      dir_name = os.path.dirname(abs_path)
1513      if not os.path.exists(dir_name):
1514        os.makedirs(dir_name)
1515      with open(abs_path, 'w') as f:
1516        f.write(content)
1517
1518  @staticmethod
1519  def constructFingerprint(prefix):
1520    return '{}:version-release/build-id/version-incremental:' \
1521           'build-type/build-tags'.format(prefix)
1522
1523  def test_CalculatePossibleFingerprints_no_dynamic_fingerprint(self):
1524    build_prop = copy.deepcopy(self.BUILD_PROP)
1525    build_prop.extend([
1526        'ro.product.brand=product-brand',
1527        'ro.product.name=product-name',
1528        'ro.product.device=product-device',
1529    ])
1530    self.writeFiles({
1531        'SYSTEM/build.prop': '\n'.join(build_prop),
1532        'VENDOR/build.prop': '\n'.join(self.VENDOR_BUILD_PROP),
1533    }, self.test_dir)
1534
1535    build_info = common.BuildInfo(common.LoadInfoDict(self.test_dir))
1536    expected = ({'product-device'},
1537                {self.constructFingerprint(
1538                    'product-brand/product-name/product-device')})
1539    self.assertEqual(expected,
1540                     CalculateRuntimeDevicesAndFingerprints(build_info, {}))
1541
1542  def test_CalculatePossibleFingerprints_single_override(self):
1543    vendor_build_prop = copy.deepcopy(self.VENDOR_BUILD_PROP)
1544    vendor_build_prop.extend([
1545        'import /vendor/etc/build_${ro.boot.sku_name}.prop',
1546    ])
1547    self.writeFiles({
1548        'SYSTEM/build.prop': '\n'.join(self.BUILD_PROP),
1549        'VENDOR/build.prop': '\n'.join(vendor_build_prop),
1550        'VENDOR/etc/build_std.prop':
1551        'ro.product.vendor.name=vendor-product-std',
1552        'VENDOR/etc/build_pro.prop':
1553        'ro.product.vendor.name=vendor-product-pro',
1554    }, self.test_dir)
1555
1556    build_info = common.BuildInfo(common.LoadInfoDict(self.test_dir))
1557    boot_variable_values = {'ro.boot.sku_name': ['std', 'pro']}
1558
1559    expected = ({'vendor-product-device'}, {
1560        self.constructFingerprint(
1561            'vendor-product-brand/vendor-product-name/vendor-product-device'),
1562        self.constructFingerprint(
1563            'vendor-product-brand/vendor-product-std/vendor-product-device'),
1564        self.constructFingerprint(
1565            'vendor-product-brand/vendor-product-pro/vendor-product-device'),
1566    })
1567    self.assertEqual(
1568        expected, CalculateRuntimeDevicesAndFingerprints(
1569            build_info, boot_variable_values))
1570
1571  def test_CalculatePossibleFingerprints_multiple_overrides(self):
1572    vendor_build_prop = copy.deepcopy(self.VENDOR_BUILD_PROP)
1573    vendor_build_prop.extend([
1574        'import /vendor/etc/build_${ro.boot.sku_name}.prop',
1575        'import /vendor/etc/build_${ro.boot.device_name}.prop',
1576    ])
1577    self.writeFiles({
1578        'SYSTEM/build.prop': '\n'.join(self.BUILD_PROP),
1579        'VENDOR/build.prop': '\n'.join(vendor_build_prop),
1580        'VENDOR/etc/build_std.prop':
1581        'ro.product.vendor.name=vendor-product-std',
1582        'VENDOR/etc/build_product1.prop':
1583        'ro.product.vendor.device=vendor-device-product1',
1584        'VENDOR/etc/build_pro.prop':
1585        'ro.product.vendor.name=vendor-product-pro',
1586        'VENDOR/etc/build_product2.prop':
1587        'ro.product.vendor.device=vendor-device-product2',
1588    }, self.test_dir)
1589
1590    build_info = common.BuildInfo(common.LoadInfoDict(self.test_dir))
1591    boot_variable_values = {
1592        'ro.boot.sku_name': ['std', 'pro'],
1593        'ro.boot.device_name': ['product1', 'product2'],
1594    }
1595
1596    expected_devices = {'vendor-product-device', 'vendor-device-product1',
1597                        'vendor-device-product2'}
1598    expected_fingerprints = {
1599        self.constructFingerprint(
1600            'vendor-product-brand/vendor-product-name/vendor-product-device'),
1601        self.constructFingerprint(
1602            'vendor-product-brand/vendor-product-std/vendor-device-product1'),
1603        self.constructFingerprint(
1604            'vendor-product-brand/vendor-product-pro/vendor-device-product1'),
1605        self.constructFingerprint(
1606            'vendor-product-brand/vendor-product-std/vendor-device-product2'),
1607        self.constructFingerprint(
1608            'vendor-product-brand/vendor-product-pro/vendor-device-product2')
1609    }
1610    self.assertEqual((expected_devices, expected_fingerprints),
1611                     CalculateRuntimeDevicesAndFingerprints(
1612                         build_info, boot_variable_values))
1613
1614  def test_GetPackageMetadata_full_package(self):
1615    vendor_build_prop = copy.deepcopy(self.VENDOR_BUILD_PROP)
1616    vendor_build_prop.extend([
1617        'import /vendor/etc/build_${ro.boot.sku_name}.prop',
1618    ])
1619    self.writeFiles({
1620        'SYSTEM/build.prop': '\n'.join(self.BUILD_PROP),
1621        'VENDOR/build.prop': '\n'.join(vendor_build_prop),
1622        'VENDOR/etc/build_std.prop':
1623        'ro.product.vendor.name=vendor-product-std',
1624        'VENDOR/etc/build_pro.prop':
1625        'ro.product.vendor.name=vendor-product-pro',
1626        AB_PARTITIONS: '\n'.join(['system', 'vendor']),
1627    }, self.test_dir)
1628
1629    common.OPTIONS.boot_variable_file = common.MakeTempFile()
1630    with open(common.OPTIONS.boot_variable_file, 'w') as f:
1631      f.write('ro.boot.sku_name=std,pro')
1632
1633    build_info = common.BuildInfo(common.LoadInfoDict(self.test_dir))
1634    metadata_dict = BuildLegacyOtaMetadata(GetPackageMetadata(build_info))
1635    self.assertEqual('vendor-product-device', metadata_dict['pre-device'])
1636    fingerprints = [
1637        self.constructFingerprint(
1638            'vendor-product-brand/vendor-product-name/vendor-product-device'),
1639        self.constructFingerprint(
1640            'vendor-product-brand/vendor-product-pro/vendor-product-device'),
1641        self.constructFingerprint(
1642            'vendor-product-brand/vendor-product-std/vendor-product-device'),
1643    ]
1644    self.assertEqual('|'.join(fingerprints), metadata_dict['post-build'])
1645
1646  def CheckMetadataEqual(self, metadata_dict, metadata_proto):
1647    post_build = metadata_proto.postcondition
1648    self.assertEqual('|'.join(post_build.build),
1649                     metadata_dict['post-build'])
1650    self.assertEqual(post_build.build_incremental,
1651                     metadata_dict['post-build-incremental'])
1652    self.assertEqual(post_build.sdk_level,
1653                     metadata_dict['post-sdk-level'])
1654    self.assertEqual(post_build.security_patch_level,
1655                     metadata_dict['post-security-patch-level'])
1656
1657    if metadata_proto.type == ota_metadata_pb2.OtaMetadata.AB:
1658      ota_type = 'AB'
1659    elif metadata_proto.type == ota_metadata_pb2.OtaMetadata.BLOCK:
1660      ota_type = 'BLOCK'
1661    else:
1662      ota_type = ''
1663    self.assertEqual(ota_type, metadata_dict['ota-type'])
1664    self.assertEqual(metadata_proto.wipe,
1665                     metadata_dict.get('ota-wipe') == 'yes')
1666    self.assertEqual(metadata_proto.required_cache,
1667                     int(metadata_dict.get('ota-required-cache', 0)))
1668    self.assertEqual(metadata_proto.retrofit_dynamic_partitions,
1669                     metadata_dict.get(
1670                         'ota-retrofit-dynamic-partitions') == 'yes')
1671
1672  def test_GetPackageMetadata_incremental_package(self):
1673    vendor_build_prop = copy.deepcopy(self.VENDOR_BUILD_PROP)
1674    vendor_build_prop.extend([
1675        'import /vendor/etc/build_${ro.boot.sku_name}.prop',
1676    ])
1677    self.writeFiles({
1678        'META/misc_info.txt': '\n'.join(self.MISC_INFO),
1679        'META/ab_partitions.txt': '\n'.join(['system', 'vendor', 'product']),
1680        'SYSTEM/build.prop': '\n'.join(self.BUILD_PROP),
1681        'VENDOR/build.prop': '\n'.join(vendor_build_prop),
1682        'VENDOR/etc/build_std.prop':
1683        'ro.product.vendor.device=vendor-device-std',
1684        'VENDOR/etc/build_pro.prop':
1685        'ro.product.vendor.device=vendor-device-pro',
1686    }, self.test_dir)
1687
1688    common.OPTIONS.boot_variable_file = common.MakeTempFile()
1689    with open(common.OPTIONS.boot_variable_file, 'w') as f:
1690      f.write('ro.boot.sku_name=std,pro')
1691
1692    source_dir = common.MakeTempDir()
1693    source_build_prop = [
1694        'ro.build.version.release=source-version-release',
1695        'ro.build.id=source-build-id',
1696        'ro.build.version.incremental=source-version-incremental',
1697        'ro.build.type=build-type',
1698        'ro.build.tags=build-tags',
1699        'ro.build.version.sdk=29',
1700        'ro.build.version.security_patch=2020',
1701        'ro.build.date.utc=12340000',
1702        'ro.system.build.version.release=source-version-release',
1703        'ro.system.build.id=source-build-id',
1704        'ro.system.build.version.incremental=source-version-incremental',
1705        'ro.system.build.type=build-type',
1706        'ro.system.build.tags=build-tags',
1707        'ro.system.build.version.sdk=29',
1708        'ro.system.build.version.security_patch=2020',
1709        'ro.system.build.date.utc=12340000',
1710        'ro.product.system.brand=generic',
1711        'ro.product.system.name=generic',
1712        'ro.product.system.device=generic',
1713    ]
1714    self.writeFiles({
1715        'META/misc_info.txt': '\n'.join(self.MISC_INFO),
1716        'META/ab_partitions.txt': '\n'.join(['system', 'vendor', 'product']),
1717        'SYSTEM/build.prop': '\n'.join(source_build_prop),
1718        'VENDOR/build.prop': '\n'.join(vendor_build_prop),
1719        'VENDOR/etc/build_std.prop':
1720        'ro.product.vendor.device=vendor-device-std',
1721        'VENDOR/etc/build_pro.prop':
1722        'ro.product.vendor.device=vendor-device-pro',
1723    }, source_dir)
1724    common.OPTIONS.incremental_source = source_dir
1725
1726    target_info = common.BuildInfo(common.LoadInfoDict(self.test_dir))
1727    source_info = common.BuildInfo(common.LoadInfoDict(source_dir))
1728
1729    metadata_proto = GetPackageMetadata(target_info, source_info)
1730    metadata_dict = BuildLegacyOtaMetadata(metadata_proto)
1731    self.assertEqual(
1732        'vendor-device-pro|vendor-device-std|vendor-product-device',
1733        metadata_dict['pre-device'])
1734    source_suffix = ':source-version-release/source-build-id/' \
1735                    'source-version-incremental:build-type/build-tags'
1736    pre_fingerprints = [
1737        'vendor-product-brand/vendor-product-name/vendor-device-pro'
1738        '{}'.format(source_suffix),
1739        'vendor-product-brand/vendor-product-name/vendor-device-std'
1740        '{}'.format(source_suffix),
1741        'vendor-product-brand/vendor-product-name/vendor-product-device'
1742        '{}'.format(source_suffix),
1743    ]
1744    self.assertEqual('|'.join(pre_fingerprints), metadata_dict['pre-build'])
1745
1746    post_fingerprints = [
1747        self.constructFingerprint(
1748            'vendor-product-brand/vendor-product-name/vendor-device-pro'),
1749        self.constructFingerprint(
1750            'vendor-product-brand/vendor-product-name/vendor-device-std'),
1751        self.constructFingerprint(
1752            'vendor-product-brand/vendor-product-name/vendor-product-device'),
1753    ]
1754    self.assertEqual('|'.join(post_fingerprints), metadata_dict['post-build'])
1755
1756    self.CheckMetadataEqual(metadata_dict, metadata_proto)
1757
1758    pre_partition_states = metadata_proto.precondition.partition_state
1759    self.assertEqual(2, len(pre_partition_states))
1760    self.assertEqual('system', pre_partition_states[0].partition_name)
1761    self.assertEqual(['generic'], pre_partition_states[0].device)
1762    self.assertEqual(['generic/generic/generic{}'.format(source_suffix)],
1763                     pre_partition_states[0].build)
1764
1765    self.assertEqual('vendor', pre_partition_states[1].partition_name)
1766    self.assertEqual(['vendor-device-pro', 'vendor-device-std',
1767                      'vendor-product-device'], pre_partition_states[1].device)
1768    vendor_fingerprints = post_fingerprints
1769    self.assertEqual(vendor_fingerprints, pre_partition_states[1].build)
1770
1771    post_partition_states = metadata_proto.postcondition.partition_state
1772    self.assertEqual(2, len(post_partition_states))
1773    self.assertEqual('system', post_partition_states[0].partition_name)
1774    self.assertEqual(['generic'], post_partition_states[0].device)
1775    self.assertEqual([self.constructFingerprint('generic/generic/generic')],
1776                     post_partition_states[0].build)
1777
1778    self.assertEqual('vendor', post_partition_states[1].partition_name)
1779    self.assertEqual(['vendor-device-pro', 'vendor-device-std',
1780                      'vendor-product-device'], post_partition_states[1].device)
1781    self.assertEqual(vendor_fingerprints, post_partition_states[1].build)
1782