1#!/usr/bin/env python3
2#
3# Copyright (C) 2020 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""Unittests for parsing files in zip64 format"""
19
20import os
21import subprocess
22import tempfile
23import unittest
24import zipfile
25import time
26
27class Zip64Test(unittest.TestCase):
28  @staticmethod
29  def _WriteFile(path, size_in_kib):
30    contents = b'X' * 1024
31    with open(path, 'wb') as f:
32      for i in range(size_in_kib):
33        f.write(contents)
34
35  @staticmethod
36  def _AddEntriesToZip(output_zip, entries_dict=None):
37    contents = b'X' * 1024
38    for name, size in entries_dict.items():
39      # Need to pass a ZipInfo with a file_size
40      # to .open() so that it adds the Zip64 header
41      # on larger files
42      info = zipfile.ZipInfo(name)
43      info.file_size = size * 1024
44      with output_zip.open(info, mode='w') as f:
45        for i in range(size):
46          f.write(contents)
47
48  def _getEntryNames(self, zip_name):
49    cmd = ['ziptool', 'zipinfo', '-1', zip_name]
50    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
51    output, _ = proc.communicate()
52    self.assertEqual(0, proc.returncode)
53    self.assertNotEqual(None, output)
54    return output.decode('utf-8').split()
55
56  def _ExtractEntries(self, zip_name):
57    with tempfile.TemporaryDirectory() as temp_dir:
58      cmd = ['ziptool', 'unzip', '-d', temp_dir, zip_name]
59      proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
60      proc.communicate()
61      self.assertEqual(0, proc.returncode)
62
63  def test_entriesSmallerThan2G(self):
64    with tempfile.NamedTemporaryFile(suffix='.zip') as zip_path:
65      # Add a few entries with each of them smaller than 2GiB. But the entire zip file is larger
66      # than 4GiB in size.
67      with zipfile.ZipFile(zip_path, 'w', allowZip64=True) as output_zip:
68        entry_dict = {'a.txt': 1025 * 1024, 'b.txt': 1025 * 1024, 'c.txt': 1025 * 1024,
69                      'd.txt': 1025 * 1024, 'e.txt': 1024}
70        self._AddEntriesToZip(output_zip, entry_dict)
71
72      read_names = self._getEntryNames(zip_path.name)
73      self.assertEqual(sorted(entry_dict.keys()), sorted(read_names))
74      self._ExtractEntries(zip_path.name)
75
76
77  def test_largeNumberOfEntries(self):
78    with tempfile.NamedTemporaryFile(suffix='.zip') as zip_path:
79      entry_dict = {}
80      # Add 100k entries (more than 65535|UINT16_MAX).
81      # We use empty files so that we don't hit any of the _other_ EOCD limits
82      # and appear to be testing the file count when we're actually not.
83      for num in range(0, 100 * 1024):
84        entry_dict[str(num)] = 0
85
86      with zipfile.ZipFile(zip_path, 'w', allowZip64=True) as output_zip:
87        self._AddEntriesToZip(output_zip, entry_dict)
88
89      read_names = self._getEntryNames(zip_path.name)
90      self.assertEqual(sorted(entry_dict.keys()), sorted(read_names))
91      self._ExtractEntries(zip_path.name)
92
93
94  def test_largeCompressedEntriesSmallerThan4G(self):
95    with tempfile.NamedTemporaryFile(suffix='.zip') as zip_path:
96      with zipfile.ZipFile(zip_path, 'w', compression=zipfile.ZIP_DEFLATED,
97                           allowZip64=True) as output_zip:
98        # Add entries close to 4GiB in size. Somehow the python library will put the (un)compressed
99        # sizes in the extra field. Test if our ziptool should be able to parse it.
100        entry_dict = {'e.txt': 4095 * 1024, 'f.txt': 4095 * 1024}
101        self._AddEntriesToZip(output_zip, entry_dict)
102
103      read_names = self._getEntryNames(zip_path.name)
104      self.assertEqual(sorted(entry_dict.keys()), sorted(read_names))
105      self._ExtractEntries(zip_path.name)
106
107
108  def test_forceDataDescriptor(self):
109    with tempfile.NamedTemporaryFile(suffix='.txt') as file_path:
110      self._WriteFile(file_path.name, 5000 * 1024)
111
112      with tempfile.NamedTemporaryFile(suffix='.zip') as zip_path:
113        with zipfile.ZipFile(zip_path, 'w', allowZip64=True) as output_zip:
114          pass
115        # The fd option force writes a data descriptor
116        cmd = ['zip', '-fd', zip_path.name, file_path.name]
117        proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
118        proc.communicate()
119        read_names = self._getEntryNames(zip_path.name)
120        self.assertEqual([file_path.name[1:]], read_names)
121        self._ExtractEntries(zip_path.name)
122
123
124  def test_largeUncompressedEntriesLargerThan4G(self):
125    with tempfile.NamedTemporaryFile(suffix='.zip') as zip_path:
126      with zipfile.ZipFile(zip_path, 'w', compression=zipfile.ZIP_STORED,
127                           allowZip64=True) as output_zip:
128        # Add entries close to 4GiB in size. Somehow the python library will put the (un)compressed
129        # sizes in the extra field. Test if our ziptool should be able to parse it.
130        entry_dict = {'g.txt': 5000 * 1024, 'h.txt': 6000 * 1024}
131        self._AddEntriesToZip(output_zip, entry_dict)
132
133      read_names = self._getEntryNames(zip_path.name)
134      self.assertEqual(sorted(entry_dict.keys()), sorted(read_names))
135      self._ExtractEntries(zip_path.name)
136
137
138  def test_largeCompressedEntriesLargerThan4G(self):
139    with tempfile.NamedTemporaryFile(suffix='.zip') as zip_path:
140      with zipfile.ZipFile(zip_path, 'w', compression=zipfile.ZIP_DEFLATED,
141                           allowZip64=True) as output_zip:
142        # Add entries close to 4GiB in size. Somehow the python library will put the (un)compressed
143        # sizes in the extra field. Test if our ziptool should be able to parse it.
144        entry_dict = {'i.txt': 4096 * 1024, 'j.txt': 7000 * 1024}
145        self._AddEntriesToZip(output_zip, entry_dict)
146
147      read_names = self._getEntryNames(zip_path.name)
148      self.assertEqual(sorted(entry_dict.keys()), sorted(read_names))
149      self._ExtractEntries(zip_path.name)
150
151
152if __name__ == '__main__':
153  unittest.main(verbosity=2)
154