1#!/usr/bin/env python3
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18import os
19import shlex
20import signal
21import subprocess
22import sys
23import time
24from threading import Thread
25
26_on_windows = sys.platform == 'win32'
27
28
29class ProcessError(Exception):
30    """Raised when invalid operations are run on a Process."""
31
32
33class Process(object):
34    """A Process object used to run various commands.
35
36    Attributes:
37        _command: The initial command to run.
38        _subprocess_kwargs: The kwargs to send to Popen for more control over
39                            execution.
40        _process: The subprocess.Popen object currently executing a process.
41        _listening_thread: The thread that is listening for the process to stop.
42        _redirection_thread: The thread that is redirecting process output.
43        _on_output_callback: The callback to call when output is received.
44        _on_terminate_callback: The callback to call when the process terminates
45                                without stop() being called first.
46        _started: Whether or not start() was called.
47        _stopped: Whether or not stop() was called.
48    """
49
50    def __init__(self, command, **kwargs):
51        """Creates a Process object.
52
53        Note that this constructor does not begin the process. To start the
54        process, use Process.start().
55        """
56        # Split command string into list if shell=True is not specified
57        self._use_shell = kwargs.get('shell', False)
58        if not self._use_shell and isinstance(command, str):
59            command = shlex.split(command)
60        self._command = command
61        self._subprocess_kwargs = kwargs
62        if _on_windows:
63            self._subprocess_kwargs['creationflags'] = (
64                subprocess.CREATE_NEW_PROCESS_GROUP)
65        else:
66            self._subprocess_kwargs['start_new_session'] = True
67        self._process = None
68
69        self._listening_thread = None
70        self._redirection_thread = None
71        self._on_output_callback = lambda *args, **kw: None
72        self._binary_output = False
73        self._on_terminate_callback = lambda *args, **kw: ''
74
75        self._started = False
76        self._stopped = False
77
78    def set_on_output_callback(self, on_output_callback, binary=False):
79        """Sets the on_output_callback function.
80
81        Args:
82            on_output_callback: The function to be called when output is sent to
83                the output. The output callback has the following signature:
84
85                >>> def on_output_callback(output_line):
86                >>>     return None
87
88            binary: If True, read the process output as raw binary.
89        Returns:
90            self
91        """
92        self._on_output_callback = on_output_callback
93        self._binary_output = binary
94        return self
95
96    def set_on_terminate_callback(self, on_terminate_callback):
97        """Sets the on_self_terminate callback function.
98
99        Args:
100            on_terminate_callback: The function to be called when the process
101                has terminated on its own. The callback has the following
102                signature:
103
104                >>> def on_self_terminate_callback(popen_process):
105                >>>     return 'command to run' or None
106
107                If a string is returned, the string returned will be the command
108                line used to run the command again. If None is returned, the
109                process will end without restarting.
110
111        Returns:
112            self
113        """
114        self._on_terminate_callback = on_terminate_callback
115        return self
116
117    def start(self):
118        """Starts the process's execution."""
119        if self._started:
120            raise ProcessError('Process has already started.')
121        self._started = True
122        self._process = None
123
124        self._listening_thread = Thread(target=self._exec_loop)
125        self._listening_thread.start()
126
127        time_up_at = time.time() + 1
128
129        while self._process is None:
130            if time.time() > time_up_at:
131                raise OSError('Unable to open process!')
132
133        self._stopped = False
134
135    @staticmethod
136    def _get_timeout_left(timeout, start_time):
137        return max(.1, timeout - (time.time() - start_time))
138
139    def is_running(self):
140        """Checks that the underlying Popen process is still running
141
142        Returns:
143            True if the process is running.
144        """
145        return self._process is not None and self._process.poll() is None
146
147    def _join_threads(self):
148        """Waits for the threads associated with the process to terminate."""
149        if self._listening_thread is not None:
150            self._listening_thread.join()
151            self._listening_thread = None
152
153        if self._redirection_thread is not None:
154            self._redirection_thread.join()
155            self._redirection_thread = None
156
157    def _kill_process(self):
158        """Kills the underlying process/process group. Implementation is
159        platform-dependent."""
160        if _on_windows:
161            subprocess.check_call('taskkill /F /T /PID %s' % self._process.pid)
162        else:
163            self.signal(signal.SIGKILL)
164
165    def wait(self, kill_timeout=60.0):
166        """Waits for the process to finish execution.
167
168        If the process has reached the kill_timeout, the process will be killed
169        instead.
170
171        Note: the on_self_terminate callback will NOT be called when calling
172        this function.
173
174        Args:
175            kill_timeout: The amount of time to wait until killing the process.
176        """
177        if self._stopped:
178            raise ProcessError('Process is already being stopped.')
179        self._stopped = True
180
181        try:
182            self._process.wait(kill_timeout)
183        except subprocess.TimeoutExpired:
184            self._kill_process()
185        finally:
186            self._join_threads()
187            self._started = False
188
189    def signal(self, sig):
190        """Sends a signal to the process.
191
192        Args:
193            sig: The signal to be sent.
194        """
195        if _on_windows:
196            raise ProcessError('Unable to call Process.signal on windows.')
197
198        pgid = os.getpgid(self._process.pid)
199        os.killpg(pgid, sig)
200
201    def stop(self):
202        """Stops the process.
203
204        This command is effectively equivalent to kill, but gives time to clean
205        up any related work on the process, such as output redirection.
206
207        Note: the on_self_terminate callback will NOT be called when calling
208        this function.
209        """
210        self.wait(0)
211
212    def _redirect_output(self):
213        """Redirects the output from the command into the on_output_callback."""
214        if self._binary_output:
215            while True:
216                data = self._process.stdout.read(1024)
217
218                if not data:
219                    return
220                else:
221                    self._on_output_callback(data)
222        else:
223            while True:
224                line = self._process.stdout.readline().decode('utf-8',
225                                                              errors='replace')
226
227                if not line:
228                    return
229                else:
230                    # Output the line without trailing \n and whitespace.
231                    self._on_output_callback(line.rstrip())
232
233    @staticmethod
234    def __start_process(command, **kwargs):
235        """A convenient wrapper function for starting the process."""
236        acts_logger = logging.getLogger()
237        acts_logger.debug(
238            'Starting command "%s" with kwargs %s', command, kwargs)
239        return subprocess.Popen(command, **kwargs)
240
241    def _exec_loop(self):
242        """Executes Popen in a loop.
243
244        When Popen terminates without stop() being called,
245        self._on_terminate_callback() will be called. The returned value from
246        _on_terminate_callback will then be used to determine if the loop should
247        continue and start up the process again. See set_on_terminate_callback()
248        for more information.
249        """
250        command = self._command
251        while True:
252            self._process = self.__start_process(command,
253                                                 stdout=subprocess.PIPE,
254                                                 stderr=subprocess.STDOUT,
255                                                 bufsize=1,
256                                                 **self._subprocess_kwargs)
257            self._redirection_thread = Thread(target=self._redirect_output)
258            self._redirection_thread.start()
259            self._process.wait()
260
261            if self._stopped:
262                logging.debug('The process for command %s was stopped.',
263                              command)
264                break
265            else:
266                logging.debug('The process for command %s terminated.',
267                              command)
268                # Wait for all output to be processed before sending
269                # _on_terminate_callback()
270                self._redirection_thread.join()
271                logging.debug('Beginning on_terminate_callback for %s.',
272                              command)
273                retry_value = self._on_terminate_callback(self._process)
274                if retry_value:
275                    if not self._use_shell and isinstance(retry_value, str):
276                        retry_value = shlex.split(retry_value)
277                    command = retry_value
278                else:
279                    break
280