1#!/usr/bin/env python3
2#
3# Copyright (C) 2020 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import gzip
19import logging
20import os
21import shlex
22import subprocess
23import tempfile
24
25
26class AndroidDevice(object):
27    """This class controls the device via adb or shell commands."""
28
29    def __init__(self, serial_number=None):
30        """Initialize the serial number.
31
32        A non-empty serial number indicates that this process runs on a host
33        and controls the devices via adb. An empty serial number indicates that
34        this process runs on an Android device and executes shell commands.
35
36        Args:
37            serial_number: A string or None.
38        """
39        self._serial_number = serial_number
40
41    @property
42    def _adb_mode(self):
43        """Returns whether this objects executes adb commands."""
44        return bool(self._serial_number)
45
46    def AdbPull(self, src, dst):
47        if not self._adb_mode:
48            raise NotImplementedError("Cannot execute `adb pull` on device.")
49        cmd = ["adb", "-s", self._serial_number, "pull", src, dst]
50        env = os.environ.copy()
51        if "ADB_COMPRESSION" not in env:
52            env["ADB_COMPRESSION"] = "0"
53        subprocess.run(cmd, shell=False, env=env, check=True,
54                       stdin=subprocess.DEVNULL, stdout=subprocess.PIPE,
55                       stderr=subprocess.PIPE)
56
57    def Execute(self, *args):
58        """Executes a command.
59
60        The caller should escape special characters.
61
62        Args:
63            args: Strings, the arguments.
64
65        Returns:
66            Stdout as a string, stderr as a string, and return code as an
67            integer.
68        """
69        if self._adb_mode:
70            cmd = ["adb", "-s", self._serial_number, "shell"]
71            cmd.extend(args)
72        else:
73            cmd = " ".join(args)
74
75        proc = subprocess.Popen(cmd, shell=not self._adb_mode,
76                                stdin=subprocess.DEVNULL,
77                                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
78        out, err = proc.communicate()
79        # Compatible with python2 and python3
80        if not isinstance(out, str):
81            out = out.decode("utf-8")
82        if not isinstance(err, str):
83            err = err.decode("utf-8")
84        return out, err, proc.returncode
85
86    def _GetProp(self, name):
87        """Gets an Android system property.
88
89        Args:
90            name: A string, the property name.
91
92        Returns:
93            A string, the value of the property.
94
95        Raises:
96            IOError if the command fails.
97        """
98        out, err, return_code = self.Execute("getprop", name)
99        if err.strip() or return_code != 0:
100            raise IOError("`getprop %s` stdout: %s\nstderr: %s" %
101                          (name, out, err))
102        return out.strip()
103
104    def GetCpuAbiList(self, bitness=""):
105        """Gets the list of supported ABIs from property.
106
107        Args:
108            bitness: 32 or 64. If the argument is not specified, this method
109                     returns both 32 and 64-bit ABIs.
110
111        Returns:
112            A list of strings, the supported ABIs.
113        """
114        out = self._GetProp("ro.product.cpu.abilist" + str(bitness))
115        return out.lower().split(",") if out else []
116
117    def GetLaunchApiLevel(self):
118        """Gets the API level that the device was initially launched with.
119
120        This method reads ro.product.first_api_level from the device. If the
121        value is 0, it then reads ro.build.version.sdk.
122
123        Returns:
124            An integer, the API level.
125        """
126        level_str = self._GetProp("ro.product.first_api_level")
127        level = int(level_str)
128        if level != 0:
129            return level
130
131        level_str = self._GetProp("ro.build.version.sdk")
132        return int(level_str)
133
134    def getLaunchApiLevel(self, strict=True):
135        """Gets the API level that the device was initially launched with.
136
137        This method is compatible with vndk_utils in vts package.
138
139        Args:
140            strict: A boolean, whether to raise an error if the property is
141                    not an integer or not defined.
142
143        Returns:
144            An integer, the API level.
145            0 if the value is undefined and strict is False.
146
147        Raises:
148            ValueError: if the value is undefined and strict is True.
149        """
150        try:
151            return self.GetLaunchApiLevel()
152        except ValueError as e:
153            if strict:
154                raise
155            logging.exception(e)
156            return 0
157
158    @property
159    def vndk_lite(self):
160        """Checks whether the vendor partition requests lite VNDK enforcement.
161
162        This method is compatible with vndk_utils in vts package.
163
164        Returns:
165            A boolean, True for lite vndk enforcement.
166        """
167        return self._GetProp("ro.vndk.lite").lower() == "true"
168
169    def GetVndkVersion(self):
170        """Gets the VNDK version that the vendor partition requests."""
171        return self._GetProp("ro.vndk.version")
172
173    def GetKernelConfig(self, config_name):
174        """Gets kernel config from the device.
175
176        Args:
177            config_name: A string, the name of the configuration.
178
179        Returns:
180            "y" or "m" if the config is set.
181            "" if the config is not set.
182            None if fails to read config.
183        """
184        line_prefix = config_name + "="
185        with tempfile.NamedTemporaryFile(delete=False) as temp_file:
186            config_path = temp_file.name
187        try:
188            logging.debug("Pull config.gz to %s", config_path)
189            self.AdbPull("/proc/config.gz", config_path)
190            with gzip.open(config_path, "rt") as config_file:
191                for line in config_file:
192                    if line.strip().startswith(line_prefix):
193                        logging.debug("Found config: %s", line)
194                        return line.strip()[len(line_prefix):]
195            logging.debug("%s is not set.", config_name)
196            return ""
197        except (subprocess.CalledProcessError, IOError) as e:
198            logging.exception("Cannot read kernel config.", e)
199            return None
200        finally:
201            os.remove(config_path)
202
203    def GetBinderBitness(self):
204        """Returns the value of BINDER_IPC_32BIT in kernel config.
205
206        Returns:
207            32 or 64, binder bitness of the device.
208            None if fails to read config.
209        """
210        config_value = self.GetKernelConfig("CONFIG_ANDROID_BINDER_IPC_32BIT")
211        if config_value is None:
212            return None
213        elif config_value:
214            return 32
215        else:
216            return 64
217
218    def GetLlndkList(self):
219        """Loads the list of LLNDK library names from the device.
220
221        Returns:
222            A list of strings, the library names including ".so".
223        """
224        out, err, return_code = self.Execute("cat",
225                                             "/system/etc/llndk.libraries.txt")
226        if err.strip() or return_code != 0:
227            raise IOError("`cat /system/etc/llndk.libraries.txt` "
228                          f"stdout: {out}\nstderr: {err}\n")
229        return out.split()
230
231    def IsRoot(self):
232        """Returns whether adb has root privilege on the device."""
233        out, err, return_code = self.Execute("id")
234        if err.strip() or return_code != 0:
235            raise IOError("`id` stdout: %s\nstderr: %s \n" % (out, err))
236        return "uid=0(root)" in out.strip()
237
238    def _Test(self, *args):
239        """Tests file types and status."""
240        out, err, return_code = self.Execute("test", *args)
241        if out.strip() or err.strip():
242            raise IOError("`test` args: %s\nstdout: %s\nstderr: %s" %
243                          (args, out, err))
244        return return_code == 0
245
246    def Exists(self, path):
247        """Returns whether a path on the device exists."""
248        return self._Test("-e", path)
249
250    def IsDirectory(self, path):
251        """Returns whether a path on the device is a directory."""
252        return self._Test("-d", path)
253
254    def _Stat(self, fmt, path):
255        """Executes stat command."""
256        out, err, return_code = self.Execute("stat", "--format", fmt, path)
257        if return_code != 0 or err.strip():
258            raise IOError("`stat --format %s %s` stdout: %s\nstderr: %s" %
259                          (fmt, path, out, err))
260        return out.strip()
261
262    def IsExecutable(self, path):
263        """Returns if execute permission is granted to a path on the device."""
264        return "x" in self._Stat("%A", path)
265
266    def FindFiles(self, path, name_pattern, *options):
267        """Executes find command.
268
269        Args:
270            path: A string, the path on the device.
271            name_pattern: A string, the pattern of the file name.
272            options: Strings, extra options passed to the command.
273
274        Returns:
275            A list of strings, the paths to the found files.
276
277        Raises:
278            ValueError if the pattern contains quotes.
279            IOError if the path does not exist.
280        """
281        if '"' in name_pattern or "'" in name_pattern:
282            raise ValueError("File name pattern contains quotes.")
283        out, err, return_code = self.Execute("find", path, "-name",
284                                             f"'{name_pattern}'", *options)
285        if return_code != 0 or err.strip():
286            raise IOError("`find %s -name '%s' %s` stdout: %s\nstderr: %s" %
287                          (path, name_pattern, " ".join(options), out, err))
288
289        # Return empty file list when out is an empty string.
290        out = out.strip()
291        return out.split("\n") if out else []
292