1#
2# Copyright (C) 2020 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import os
18import logging
19import pkgutil
20from importlib import resources
21
22import ltp_configs
23import ltp_enums
24import test_case
25from configs import stable_tests
26from configs import disabled_tests
27from common import filter_utils
28from typing import Set, Optional, List, Callable
29
30ltp_test_template = '        <option name="test-command-line" key="%s" value="&env_setup_cmd; ;' \
31                    ' cd &ltp_bin_dir; ; %s" />'
32
33class LtpTestCases(object):
34    """Load a ltp vts testcase definition file and parse it into a generator.
35
36    Attributes:
37        _data_path: string, the vts data path on host side
38        _filter_func: function, a filter method that will emit exception if a test is filtered
39        _ltp_tests_filter: list of string, filter for tests that are stable and disabled
40        _ltp_binaries: list of string, All ltp binaries that generate in build time
41        _ltp_config_lines: list of string: the context of the generated config
42    """
43
44    def __init__(self, filter_func: Callable):
45        self._filter_func = filter_func
46        self._ltp_tests_filter = filter_utils.Filter(
47            set(stable_tests.STABLE_TESTS.keys()),
48            disabled_tests.DISABLED_TESTS,
49            enable_regex=True)
50        self._ltp_tests_filter.ExpandBitness()
51        self._ltp_binaries = []
52        self._ltp_config_lines = []
53
54    def ValidateDefinition(self, line: str) -> Optional[List[str]]:
55        """Validate a tab delimited test case definition.
56
57        Will check whether the given line of definition has three parts
58        separated by tabs.
59        It will also trim leading and ending white spaces for each part
60        in returned tuple (if valid).
61
62        Returns:
63            A tuple in format (test suite, test name, test command) if
64            definition is valid. None otherwise.
65        """
66        items = [
67            item.strip()
68            for item in line.split(ltp_enums.Delimiters.TESTCASE_DEFINITION)
69        ]
70        if not len(items) == 3 or not items:
71            return None
72        else:
73            return items
74
75    def ReadConfigTemplateFile(self) -> str:
76        """Read the template of the config file and return the context.
77
78        Returns:
79            String.
80        """
81        # This gets bundled into the gen_ltp_config tool during the build
82        return pkgutil.get_data('template', 'template.xml').decode()
83
84    def GetKernelModuleControllerOption(self, arch: str, n_bit: int, is_low_mem: bool = False, is_hwasan: bool = False) -> str:
85        """Get the Option of KernelModuleController.
86
87        Args:
88            arch: String, arch
89            n_bit: int, bitness
90            is_low_mem: bool, whether to use low memory device configuration
91            is_hwasan: bool, whether to use hwasan configuration
92
93        Returns:
94            String.
95        """
96        arch_template = '        <option name="arch" value="{}"/>\n'
97        is_low_mem_template = '        <option name="is-low-mem" value="{}"/>\n'
98        is_hwasan_template = '        <option name="is-hwasan" value="{}"/>'
99        option_lines = arch_template + is_low_mem_template + is_hwasan_template
100        if n_bit == '64':
101            n_bit_string = str(n_bit) if arch == 'arm' or arch == 'riscv' else ('_'+str(n_bit))
102        else:
103            n_bit_string = ''
104        arch_name = arch + n_bit_string
105        is_low_mem = 'true' if is_low_mem else 'false'
106        is_hwasan = 'true' if is_hwasan else 'false'
107        option_lines = option_lines.format(arch_name,
108                                           str(is_low_mem).lower(),
109                                           str(is_hwasan).lower())
110        return option_lines
111
112    def GetLtpBinaries(self):
113        """Check the binary exist in the command.
114
115        Args:
116            command: String, the test command
117        """
118        for line in pkgutil.get_data('android.tools', 'gen.bp').decode().splitlines():
119            line = line.strip()
120            if not line or line.startswith('#'):
121                continue
122            if line.startswith("stem:") or line.startswith('filename:'):
123                ltp_binary = line.split('"')[1]
124                self._ltp_binaries.append(ltp_binary)
125
126    def IsLtpBinaryExist(self, commands: str) -> bool:
127        """Check the binary exist in the command.
128
129        Args:
130            command: String, the test command
131
132        Returns:
133            bool: True if the binary in the gen.bp
134        """
135        all_commands = commands.split(';')
136        for cmd in all_commands:
137            cmd = cmd.strip()
138            binary_name = cmd.split(' ')[0]
139            if binary_name in self._ltp_binaries:
140                return True
141        logging.info("Ltp binary not exist in cmd of '%s'", commands)
142        return False
143
144    def GenConfig(self,
145             arch: str,
146             n_bit: int,
147             test_filter: filter_utils.Filter,
148             output_file: str,
149             run_staging: bool = False,
150             is_low_mem: bool = False,
151             is_hwasan: bool = False):
152        """Read the definition file and generate the test config.
153
154        Args:
155            arch: String, arch
156            n_bit: int, bitness
157            test_filter: Filter object, test name filter from base_test
158            output_file: String, the file path of the generating config
159            run_staging: bool, whether to use staging configuration
160            is_low_mem: bool, whether to use low memory device configuration
161        """
162        self.GetLtpBinaries()
163        scenario_groups = (ltp_configs.TEST_SUITES_LOW_MEM
164                           if is_low_mem else ltp_configs.TEST_SUITES)
165        logging.info('LTP scenario groups: %s', scenario_groups)
166        start_append_test_keyword = 'option name="per-binary-timeout"'
167        config_lines = self.ReadConfigTemplateFile()
168        module_controller_option = self.GetKernelModuleControllerOption(arch, n_bit,
169                                                                        is_low_mem,
170                                                                        is_hwasan)
171        mandatory_test_cases = []
172        skippable_test_cases = []
173        run_script = self.GenerateLtpRunScript(scenario_groups)
174        for line in run_script:
175            items = self.ValidateDefinition(line)
176            if not items:
177                continue
178
179            testsuite, testname, command = items
180            if is_low_mem and testsuite.endswith(
181                    ltp_configs.LOW_MEMORY_SCENARIO_GROUP_SUFFIX):
182                testsuite = testsuite[:-len(
183                    ltp_configs.LOW_MEMORY_SCENARIO_GROUP_SUFFIX)]
184
185            # Tests failed to build will have prefix "DISABLED_"
186            if testname.startswith("DISABLED_"):
187                logging.info("[Parser] Skipping test case {}-{}. Reason: "
188                             "not built".format(testsuite, testname))
189                continue
190
191            # Some test cases have hardcoded "/tmp" in the command
192            # we replace that with ltp_configs.TMPDIR
193            command = command.replace('/tmp', ltp_configs.TMPDIR)
194
195            testcase = test_case.TestCase(
196                testsuite=testsuite, testname=testname, command=command)
197            test_display_name = "{}_{}bit".format(str(testcase), n_bit)
198
199            # Check runner's base_test filtering method
200            try:
201                self._filter_func(test_display_name)
202            except:
203                logging.info("[Parser] Skipping test case %s. Reason: "
204                             "filtered" % testcase.fullname)
205                testcase.is_filtered = True
206                testcase.note = "filtered"
207
208            logging.info('ltp_test_cases Load(): test_display_name = %s\n'
209                         'cmd = %s', test_display_name, command)
210
211            # For skipping tests that are not designed or ready for Android,
212            # check for bit specific test in disabled list as well as non-bit specific
213            if ((self._ltp_tests_filter.IsInExcludeFilter(str(testcase)) or
214                 self._ltp_tests_filter.IsInExcludeFilter(test_display_name)) and
215                    not test_filter.IsInIncludeFilter(test_display_name)):
216                logging.info("[Parser] Skipping test case %s. Reason: "
217                             "disabled" % testcase.fullname)
218                continue
219
220            # For separating staging tests from stable tests
221            if not self._ltp_tests_filter.IsInIncludeFilter(test_display_name):
222                if not run_staging and not test_filter.IsInIncludeFilter(
223                        test_display_name):
224                    # Skip staging tests in stable run
225                    continue
226                else:
227                    testcase.is_staging = True
228                    testcase.note = "staging"
229            else:
230                if run_staging:
231                    # Skip stable tests in staging run
232                    continue
233
234            if not testcase.is_staging:
235                if stable_tests.STABLE_TESTS.get(test_display_name, False):
236                    testcase.is_mandatory = True
237
238            if is_hwasan:
239                if test_display_name in disabled_tests.DISABLED_TESTS_HWASAN:
240                    continue
241
242            if self.IsLtpBinaryExist(command):
243                logging.info("[Parser] Adding test case %s." % testcase.fullname)
244                # Some test cases contain semicolons in their commands,
245                # and we replace them with &&
246                command = command.replace(';', '&amp;&amp;')
247                # Replace the original command with '/data/local/tmp/ltp'
248                # e.g. mm.mmapstress07
249                command = command.replace(ltp_configs.LTPDIR, '&ltp_dir;')
250                ltp_test_line = ltp_test_template % (test_display_name, command)
251                if testcase.is_mandatory:
252                    mandatory_test_cases.append(ltp_test_line)
253                else:
254                    skippable_test_cases.append(ltp_test_line)
255        nativetest_bit_path = '64' if n_bit == '64' else ''
256        config_lines = config_lines.format(
257            nativetest_bit_path=nativetest_bit_path,
258            module_controller_option=module_controller_option,
259            mandatory_test_cases='\n'.join(mandatory_test_cases),
260            skippable_test_cases='\n'.join(skippable_test_cases))
261        with open(output_file, 'w') as f:
262            f.write(config_lines)
263
264    def GenerateLtpTestCases(self, testsuite: str, disabled_tests_list: List[str]) -> List[str]:
265        '''Generate test cases for each ltp test suite.
266
267        Args:
268            testsuite: string, test suite name
269
270        Returns:
271            A list of string
272        '''
273        result = []
274        for line in pkgutil.get_data('runtest', testsuite).decode().splitlines():
275            line = line.strip()
276            if not line or line.startswith('#'):
277                continue
278
279            testname = line.split()[0]
280            testname_prefix = ('DISABLED_'
281                               if testname in disabled_tests_list else '')
282            testname_modified = testname_prefix + testname
283
284            result.append("\t".join(
285                [testsuite, testname_modified, line[len(testname):].strip()]))
286        return result
287
288    def GenerateLtpRunScript(self, scenario_groups: List[str]) -> List[str]:
289        '''Given a scenario group generate test case script.
290
291        Args:
292            scenario_groups: list of string, name of test scenario groups to use
293
294        Returns:
295            A list of string
296        '''
297        disabled_tests_list = pkgutil.get_data('android.tools', 'disabled_tests.txt').decode().splitlines()
298        disabled_tests_list = [line.strip() for line in disabled_tests_list]
299        disabled_tests_list = set(
300            line for line in disabled_tests_list
301            if line and not line.startswith('#'))
302
303        result = []
304        for testsuite in scenario_groups:
305            result.extend(
306                self.GenerateLtpTestCases(testsuite, disabled_tests_list))
307        return result
308