1#!/usr/bin/env python3
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16import subprocess
17import unittest
18
19from acts.libs.proc.process import Process
20from acts.libs.proc.process import ProcessError
21import mock
22
23
24class FakeThread(object):
25    def __init__(self, target=None):
26        self.target = target
27        self.alive = False
28
29    def _on_start(self):
30        pass
31
32    def start(self):
33        self.alive = True
34        if self._on_start:
35            self._on_start()
36
37    def stop(self):
38        self.alive = False
39
40    def join(self):
41        pass
42
43
44class ProcessTest(unittest.TestCase):
45    """Tests the acts.libs.proc.process.Process class."""
46
47    def setUp(self):
48        self._Process__start_process = Process._Process__start_process
49
50    def tearDown(self):
51        Process._Process__start_process = self._Process__start_process
52
53    @staticmethod
54    def patch(imported_name, *args, **kwargs):
55        return mock.patch('acts.libs.proc.process.%s' % imported_name,
56                          *args, **kwargs)
57
58    # set_on_output_callback
59
60    def test_set_on_output_callback(self):
61        """Tests that set_on_output_callback sets on_output_callback."""
62        callback = mock.Mock()
63
64        process = Process('cmd').set_on_output_callback(callback)
65        process._on_output_callback()
66
67        self.assertTrue(callback.called)
68
69    # set_on_terminate_callback
70
71    def test_set_on_terminate_callback(self):
72        """Tests that set_on_terminate_callback sets _on_terminate_callback."""
73        callback = mock.Mock()
74
75        process = Process('cmd').set_on_terminate_callback(callback)
76        process._on_terminate_callback()
77
78        self.assertTrue(callback.called)
79
80    # start
81
82    def test_start_raises_if_called_back_to_back(self):
83        """Tests that start raises an exception if it has already been called
84        prior.
85
86        This is required to prevent references to processes and threads from
87        being overwritten, potentially causing ACTS to hang."""
88        process = Process('cmd')
89
90        # Here we need the thread to start the process object.
91        class FakeThreadImpl(FakeThread):
92            def _on_start(self):
93                process._process = mock.Mock()
94
95        with self.patch('Thread', FakeThreadImpl):
96            process.start()
97            expected_msg = 'Process has already started.'
98            with self.assertRaisesRegex(ProcessError, expected_msg):
99                process.start()
100
101    def test_start_starts_listening_thread(self):
102        """Tests that start starts the _exec_popen_loop function."""
103        process = Process('cmd')
104
105        # Here we need the thread to start the process object.
106        class FakeThreadImpl(FakeThread):
107            def _on_start(self):
108                process._process = mock.Mock()
109
110        with self.patch('Thread', FakeThreadImpl):
111            process.start()
112
113        self.assertTrue(process._listening_thread.alive)
114        self.assertEqual(process._listening_thread.target, process._exec_loop)
115
116    # wait
117
118    def test_wait_raises_if_called_back_to_back(self):
119        """Tests that wait raises an exception if it has already been called
120        prior."""
121        process = Process('cmd')
122        process._process = mock.Mock()
123
124        process.wait(0)
125        expected_msg = 'Process is already being stopped.'
126        with self.assertRaisesRegex(ProcessError, expected_msg):
127            process.wait(0)
128
129    @mock.patch.object(Process, '_kill_process')
130    def test_wait_kills_after_timeout(self, *_):
131        """Tests that if a TimeoutExpired error is thrown during wait, the
132        process is killed."""
133        process = Process('cmd')
134        process._process = mock.Mock()
135        process._process.wait.side_effect = subprocess.TimeoutExpired('', '')
136
137        process.wait(0)
138
139        self.assertEqual(process._kill_process.called, True)
140
141    @mock.patch('os.getpgid', side_effect=lambda id: id)
142    @mock.patch('os.killpg')
143    def test_sends_signal(self, mock_os, *_):
144        """Tests that signal is sent to process.."""
145        process = Process('cmd')
146        mock_process = mock.Mock()
147        mock_process.pid = -1
148        process._process = mock_process
149
150        process.signal(51641)
151
152        mock_os.assert_called_with(-1, 51641)
153
154    def test_signal_raises_error_on_windows(self, *_):
155        """Tests that signaling is unsupported in windows with appropriate
156        error msg."""
157        process = Process('cmd')
158        mock_inner_process = mock.Mock()
159        mock_inner_process.pid = -1
160        process._process = mock_inner_process
161
162        with mock.patch('acts.libs.proc.process._on_windows', True):
163            with self.assertRaises(ProcessError):
164                process.signal(51641)
165
166    @mock.patch.object(Process, '_kill_process')
167    def test_wait_sets_stopped_to_true_before_process_kill(self, *_):
168        """Tests that stop() sets the _stopped attribute to True.
169
170        This order is required to prevent the _exec_loop from calling
171        _on_terminate_callback when the user has killed the process.
172        """
173        verifier = mock.Mock()
174        verifier.passed = False
175
176        def test_call_order():
177            self.assertTrue(process._stopped)
178            verifier.passed = True
179
180        process = Process('cmd')
181        process._process = mock.Mock()
182        process._process.poll.return_value = None
183        process._process.wait.side_effect = subprocess.TimeoutExpired('', '')
184        process._kill_process = test_call_order
185
186        process.wait()
187
188        self.assertEqual(verifier.passed, True)
189
190    def test_wait_joins_listening_thread_if_it_exists(self):
191        """Tests wait() joins _listening_thread if it exists."""
192        process = Process('cmd')
193        process._process = mock.Mock()
194        mocked_thread = mock.Mock()
195        process._listening_thread = mocked_thread
196
197        process.wait(0)
198
199        self.assertEqual(mocked_thread.join.called, True)
200
201    def test_wait_clears_listening_thread_if_it_exists(self):
202        """Tests wait() joins _listening_thread if it exists.
203
204        Threads can only be started once, so after wait has been called, we
205        want to make sure we clear the listening thread.
206        """
207        process = Process('cmd')
208        process._process = mock.Mock()
209        process._listening_thread = mock.Mock()
210
211        process.wait(0)
212
213        self.assertEqual(process._listening_thread, None)
214
215    def test_wait_joins_redirection_thread_if_it_exists(self):
216        """Tests wait() joins _listening_thread if it exists."""
217        process = Process('cmd')
218        process._process = mock.Mock()
219        mocked_thread = mock.Mock()
220        process._redirection_thread = mocked_thread
221
222        process.wait(0)
223
224        self.assertEqual(mocked_thread.join.called, True)
225
226    def test_wait_clears_redirection_thread_if_it_exists(self):
227        """Tests wait() joins _listening_thread if it exists.
228
229        Threads can only be started once, so after wait has been called, we
230        want to make sure we clear the listening thread.
231        """
232        process = Process('cmd')
233        process._process = mock.Mock()
234        process._redirection_thread = mock.Mock()
235
236        process.wait(0)
237
238        self.assertEqual(process._redirection_thread, None)
239
240    # stop
241
242    def test_stop_sets_stopped_to_true(self):
243        """Tests that stop() sets the _stopped attribute to True."""
244        process = Process('cmd')
245        process._process = mock.Mock()
246
247        process.stop()
248
249        self.assertTrue(process._stopped)
250
251    def test_stop_sets_stopped_to_true_before_process_kill(self):
252        """Tests that stop() sets the _stopped attribute to True.
253
254        This order is required to prevent the _exec_loop from calling
255        _on_terminate_callback when the user has killed the process.
256        """
257        verifier = mock.Mock()
258        verifier.passed = False
259
260        def test_call_order():
261            self.assertTrue(process._stopped)
262            verifier.passed = True
263
264        process = Process('cmd')
265        process._process = mock.Mock()
266        process._process.poll.return_value = None
267        process._kill_process = test_call_order
268        process._process.wait.side_effect = subprocess.TimeoutExpired('', '')
269
270        process.stop()
271
272        self.assertEqual(verifier.passed, True)
273
274    def test_stop_calls_wait(self):
275        """Tests that stop() also has the functionality of wait()."""
276        process = Process('cmd')
277        process._process = mock.Mock()
278        process.wait = mock.Mock()
279
280        process.stop()
281
282        self.assertEqual(process.wait.called, True)
283
284    # _redirect_output
285
286    def test_redirect_output_feeds_all_lines_to_on_output_callback(self):
287        """Tests that _redirect_output loops until all lines are parsed."""
288        received_list = []
289
290        def appender(line):
291            received_list.append(line)
292
293        process = Process('cmd')
294        process.set_on_output_callback(appender)
295        process._process = mock.Mock()
296        process._process.stdout.readline.side_effect = [b'a\n', b'b\n', b'']
297
298        process._redirect_output()
299
300        self.assertEqual(received_list[0], 'a')
301        self.assertEqual(received_list[1], 'b')
302        self.assertEqual(len(received_list), 2)
303
304    # __start_process
305
306    def test_start_process_returns_a_popen_object(self):
307        """Tests that a Popen object is returned by __start_process."""
308        with self.patch('subprocess.Popen', return_value='verification'):
309            self.assertEqual(Process._Process__start_process('cmd'),
310                             'verification')
311
312    # _exec_loop
313
314    def test_exec_loop_redirections_output(self):
315        """Tests that the _exec_loop function calls to redirect the output."""
316        process = Process('cmd')
317        Process._Process__start_process = mock.Mock()
318
319        with self.patch('Thread', FakeThread):
320            process._exec_loop()
321
322        self.assertEqual(process._redirection_thread.target,
323                         process._redirect_output)
324        self.assertEqual(process._redirection_thread.alive, True)
325
326    def test_exec_loop_waits_for_process(self):
327        """Tests that the _exec_loop waits for the process to complete before
328        returning."""
329        process = Process('cmd')
330        Process._Process__start_process = mock.Mock()
331
332        with self.patch('Thread', FakeThread):
333            process._exec_loop()
334
335        self.assertEqual(process._process.wait.called, True)
336
337    def test_exec_loop_loops_if_not_stopped(self):
338        process = Process('1st')
339        Process._Process__start_process = mock.Mock()
340        process._on_terminate_callback = mock.Mock(side_effect=[['2nd'], None])
341
342        with self.patch('Thread', FakeThread):
343            process._exec_loop()
344
345        self.assertEqual(Process._Process__start_process.call_count, 2)
346        self.assertEqual(Process._Process__start_process.call_args_list[0][0],
347                         (['1st'],))
348        self.assertEqual(Process._Process__start_process.call_args_list[1][0],
349                         (['2nd'],))
350
351    def test_exec_loop_does_not_loop_if_stopped(self):
352        process = Process('1st')
353        Process._Process__start_process = mock.Mock()
354        process._on_terminate_callback = mock.Mock(
355            side_effect=['2nd', None])
356        process._stopped = True
357
358        with self.patch('Thread', FakeThread):
359            process._exec_loop()
360
361        self.assertEqual(Process._Process__start_process.call_count, 1)
362        self.assertEqual(
363            Process._Process__start_process.call_args_list[0][0],
364            (['1st'],))
365
366
367if __name__ == '__main__':
368    unittest.main()
369