1#!/usr/bin/env python3
2#
3#   Copyright 2016 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import json
18import logging
19import math
20import os
21import shlex
22import subprocess
23import threading
24import time
25
26from acts import context
27from acts import logger as acts_logger
28from acts import utils
29from acts.controllers.android_device import AndroidDevice
30from acts.controllers.utils_lib.ssh import connection
31from acts.controllers.utils_lib.ssh import settings
32from acts.event import event_bus
33from acts.event.decorators import subscribe_static
34from acts.event.event import TestClassBeginEvent
35from acts.event.event import TestClassEndEvent
36from acts.libs.proc import job
37
38MOBLY_CONTROLLER_CONFIG_NAME = 'IPerfServer'
39ACTS_CONTROLLER_REFERENCE_NAME = 'iperf_servers'
40KILOBITS = 1024
41MEGABITS = KILOBITS * 1024
42GIGABITS = MEGABITS * 1024
43BITS_IN_BYTE = 8
44
45
46def create(configs):
47    """ Factory method for iperf servers.
48
49    The function creates iperf servers based on at least one config.
50    If configs only specify a port number, a regular local IPerfServer object
51    will be created. If configs contains ssh settings or and AndroidDevice,
52    remote iperf servers will be started on those devices
53
54    Args:
55        configs: config parameters for the iperf server
56    """
57    results = []
58    for c in configs:
59        if type(c) in (str, int) and str(c).isdigit():
60            results.append(IPerfServer(int(c)))
61        elif type(c) is dict and 'AndroidDevice' in c and 'port' in c:
62            results.append(IPerfServerOverAdb(c['AndroidDevice'], c['port']))
63        elif type(c) is dict and 'ssh_config' in c and 'port' in c:
64            results.append(
65                IPerfServerOverSsh(c['ssh_config'],
66                                   c['port'],
67                                   test_interface=c.get('test_interface'),
68                                   use_killall=c.get('use_killall')))
69        else:
70            raise ValueError(
71                'Config entry %s in %s is not a valid IPerfServer '
72                'config.' % (repr(c), configs))
73    return results
74
75
76def get_info(iperf_servers):
77    """Placeholder for info about iperf servers
78
79    Returns:
80        None
81    """
82    return None
83
84
85def destroy(iperf_server_list):
86    for iperf_server in iperf_server_list:
87        try:
88            iperf_server.stop()
89        except Exception:
90            logging.exception('Unable to properly clean up %s.' % iperf_server)
91
92
93class IPerfResult(object):
94
95    def __init__(self, result_path, reporting_speed_units='Mbytes'):
96        """Loads iperf result from file.
97
98        Loads iperf result from JSON formatted server log. File can be accessed
99        before or after server is stopped. Note that only the first JSON object
100        will be loaded and this funtion is not intended to be used with files
101        containing multiple iperf client runs.
102        """
103        # if result_path isn't a path, treat it as JSON
104        self.reporting_speed_units = reporting_speed_units
105        if not os.path.exists(result_path):
106            self.result = json.loads(result_path)
107        else:
108            try:
109                with open(result_path, 'r') as f:
110                    iperf_output = f.readlines()
111                    if '}\n' in iperf_output:
112                        iperf_output = iperf_output[:iperf_output.index('}\n'
113                                                                        ) + 1]
114                    iperf_string = ''.join(iperf_output)
115                    iperf_string = iperf_string.replace('nan', '0')
116                    self.result = json.loads(iperf_string)
117            except ValueError:
118                with open(result_path, 'r') as f:
119                    # Possibly a result from interrupted iperf run,
120                    # skip first line and try again.
121                    lines = f.readlines()[1:]
122                    self.result = json.loads(''.join(lines))
123
124    def _has_data(self):
125        """Checks if the iperf result has valid throughput data.
126
127        Returns:
128            True if the result contains throughput data. False otherwise.
129        """
130        return ('end' in self.result) and ('sum_received' in self.result['end']
131                                           or 'sum' in self.result['end'])
132
133    def _get_reporting_speed(self, network_speed_in_bits_per_second):
134        """Sets the units for the network speed reporting based on how the
135        object was initiated.  Defaults to Megabytes per second.  Currently
136        supported, bits per second (bits), kilobits per second (kbits), megabits
137        per second (mbits), gigabits per second (gbits), bytes per second
138        (bytes), kilobits per second (kbytes), megabits per second (mbytes),
139        gigabytes per second (gbytes).
140
141        Args:
142            network_speed_in_bits_per_second: The network speed from iperf in
143                bits per second.
144
145        Returns:
146            The value of the throughput in the appropriate units.
147        """
148        speed_divisor = 1
149        if self.reporting_speed_units[1:].lower() == 'bytes':
150            speed_divisor = speed_divisor * BITS_IN_BYTE
151        if self.reporting_speed_units[0:1].lower() == 'k':
152            speed_divisor = speed_divisor * KILOBITS
153        if self.reporting_speed_units[0:1].lower() == 'm':
154            speed_divisor = speed_divisor * MEGABITS
155        if self.reporting_speed_units[0:1].lower() == 'g':
156            speed_divisor = speed_divisor * GIGABITS
157        return network_speed_in_bits_per_second / speed_divisor
158
159    def get_json(self):
160        """Returns the raw json output from iPerf."""
161        return self.result
162
163    @property
164    def error(self):
165        return self.result.get('error', None)
166
167    @property
168    def avg_rate(self):
169        """Average UDP rate in MB/s over the entire run.
170
171        This is the average UDP rate observed at the terminal the iperf result
172        is pulled from. According to iperf3 documentation this is calculated
173        based on bytes sent and thus is not a good representation of the
174        quality of the link. If the result is not from a success run, this
175        property is None.
176        """
177        if not self._has_data() or 'sum' not in self.result['end']:
178            return None
179        bps = self.result['end']['sum']['bits_per_second']
180        return self._get_reporting_speed(bps)
181
182    @property
183    def avg_receive_rate(self):
184        """Average receiving rate in MB/s over the entire run.
185
186        This data may not exist if iperf was interrupted. If the result is not
187        from a success run, this property is None.
188        """
189        if not self._has_data() or 'sum_received' not in self.result['end']:
190            return None
191        bps = self.result['end']['sum_received']['bits_per_second']
192        return self._get_reporting_speed(bps)
193
194    @property
195    def avg_send_rate(self):
196        """Average sending rate in MB/s over the entire run.
197
198        This data may not exist if iperf was interrupted. If the result is not
199        from a success run, this property is None.
200        """
201        if not self._has_data() or 'sum_sent' not in self.result['end']:
202            return None
203        bps = self.result['end']['sum_sent']['bits_per_second']
204        return self._get_reporting_speed(bps)
205
206    @property
207    def instantaneous_rates(self):
208        """Instantaneous received rate in MB/s over entire run.
209
210        This data may not exist if iperf was interrupted. If the result is not
211        from a success run, this property is None.
212        """
213        if not self._has_data():
214            return None
215        intervals = [
216            self._get_reporting_speed(interval['sum']['bits_per_second'])
217            for interval in self.result['intervals']
218        ]
219        return intervals
220
221    @property
222    def std_deviation(self):
223        """Standard deviation of rates in MB/s over entire run.
224
225        This data may not exist if iperf was interrupted. If the result is not
226        from a success run, this property is None.
227        """
228        return self.get_std_deviation(0)
229
230    def get_std_deviation(self, iperf_ignored_interval):
231        """Standard deviation of rates in MB/s over entire run.
232
233        This data may not exist if iperf was interrupted. If the result is not
234        from a success run, this property is None. A configurable number of
235        beginning (and the single last) intervals are ignored in the
236        calculation as they are inaccurate (e.g. the last is from a very small
237        interval)
238
239        Args:
240            iperf_ignored_interval: number of iperf interval to ignored in
241            calculating standard deviation
242
243        Returns:
244            The standard deviation.
245        """
246        if not self._has_data():
247            return None
248        instantaneous_rates = self.instantaneous_rates[
249            iperf_ignored_interval:-1]
250        avg_rate = math.fsum(instantaneous_rates) / len(instantaneous_rates)
251        sqd_deviations = ([(rate - avg_rate)**2
252                           for rate in instantaneous_rates])
253        std_dev = math.sqrt(
254            math.fsum(sqd_deviations) / (len(sqd_deviations) - 1))
255        return std_dev
256
257
258class IPerfServerBase(object):
259    # Keeps track of the number of IPerfServer logs to prevent file name
260    # collisions.
261    __log_file_counter = 0
262
263    __log_file_lock = threading.Lock()
264
265    def __init__(self, port):
266        self._port = port
267        # TODO(markdr): We shouldn't be storing the log files in an array like
268        # this. Nobody should be reading this property either. Instead, the
269        # IPerfResult should be returned in stop() with all the necessary info.
270        # See aosp/1012824 for a WIP implementation.
271        self.log_files = []
272
273    @property
274    def port(self):
275        raise NotImplementedError('port must be specified.')
276
277    @property
278    def started(self):
279        raise NotImplementedError('started must be specified.')
280
281    def start(self, extra_args='', tag=''):
282        """Starts an iperf3 server.
283
284        Args:
285            extra_args: A string representing extra arguments to start iperf
286                server with.
287            tag: Appended to log file name to identify logs from different
288                iperf runs.
289        """
290        raise NotImplementedError('start() must be specified.')
291
292    def stop(self):
293        """Stops the iperf server.
294
295        Returns:
296            The name of the log file generated from the terminated session.
297        """
298        raise NotImplementedError('stop() must be specified.')
299
300    def _get_full_file_path(self, tag=None):
301        """Returns the full file path for the IPerfServer log file.
302
303        Note: If the directory for the file path does not exist, it will be
304        created.
305
306        Args:
307            tag: The tag passed in to the server run.
308        """
309        out_dir = self.log_path
310
311        with IPerfServerBase.__log_file_lock:
312            tags = [tag, IPerfServerBase.__log_file_counter]
313            out_file_name = 'IPerfServer,%s.log' % (','.join(
314                [str(x) for x in tags if x != '' and x is not None]))
315            IPerfServerBase.__log_file_counter += 1
316
317        file_path = os.path.join(out_dir, out_file_name)
318        self.log_files.append(file_path)
319        return file_path
320
321    @property
322    def log_path(self):
323        current_context = context.get_current_context()
324        full_out_dir = os.path.join(current_context.get_full_output_path(),
325                                    'IPerfServer%s' % self.port)
326
327        # Ensure the directory exists.
328        os.makedirs(full_out_dir, exist_ok=True)
329
330        return full_out_dir
331
332
333def _get_port_from_ss_output(ss_output, pid):
334    pid = str(pid)
335    lines = ss_output.split('\n')
336    for line in lines:
337        if pid in line:
338            # Expected format:
339            # tcp LISTEN  0 5 *:<PORT>  *:* users:(("cmd",pid=<PID>,fd=3))
340            return line.split()[4].split(':')[-1]
341    else:
342        raise ProcessLookupError('Could not find started iperf3 process.')
343
344
345class IPerfServer(IPerfServerBase):
346    """Class that handles iperf server commands on localhost."""
347
348    def __init__(self, port=5201):
349        super().__init__(port)
350        self._hinted_port = port
351        self._current_log_file = None
352        self._iperf_process = None
353        self._last_opened_file = None
354
355    @property
356    def port(self):
357        return self._port
358
359    @property
360    def started(self):
361        return self._iperf_process is not None
362
363    def start(self, extra_args='', tag=''):
364        """Starts iperf server on local machine.
365
366        Args:
367            extra_args: A string representing extra arguments to start iperf
368                server with.
369            tag: Appended to log file name to identify logs from different
370                iperf runs.
371        """
372        if self._iperf_process is not None:
373            return
374
375        self._current_log_file = self._get_full_file_path(tag)
376
377        # Run an iperf3 server on the hinted port with JSON output.
378        command = ['iperf3', '-s', '-p', str(self._hinted_port), '-J']
379
380        command.extend(shlex.split(extra_args))
381
382        if self._last_opened_file:
383            self._last_opened_file.close()
384        self._last_opened_file = open(self._current_log_file, 'w')
385        self._iperf_process = subprocess.Popen(command,
386                                               stdout=self._last_opened_file,
387                                               stderr=subprocess.DEVNULL)
388        for attempts_left in reversed(range(3)):
389            try:
390                self._port = int(
391                    _get_port_from_ss_output(
392                        job.run('ss -l -p -n | grep iperf').stdout,
393                        self._iperf_process.pid))
394                break
395            except ProcessLookupError:
396                if attempts_left == 0:
397                    raise
398                logging.debug('iperf3 process not started yet.')
399                time.sleep(.01)
400
401    def stop(self):
402        """Stops the iperf server.
403
404        Returns:
405            The name of the log file generated from the terminated session.
406        """
407        if self._iperf_process is None:
408            return
409
410        if self._last_opened_file:
411            self._last_opened_file.close()
412            self._last_opened_file = None
413
414        self._iperf_process.terminate()
415        self._iperf_process = None
416
417        return self._current_log_file
418
419    def __del__(self):
420        self.stop()
421
422
423class IPerfServerOverSsh(IPerfServerBase):
424    """Class that handles iperf3 operations on remote machines."""
425
426    def __init__(self,
427                 ssh_config,
428                 port,
429                 test_interface=None,
430                 use_killall=False):
431        super().__init__(port)
432        self.ssh_settings = settings.from_config(ssh_config)
433        self.log = acts_logger.create_tagged_trace_logger(
434            f'IPerfServer | {self.ssh_settings.hostname}')
435        self._ssh_session = None
436        self.start_ssh()
437
438        self._iperf_pid = None
439        self._current_tag = None
440        self.hostname = self.ssh_settings.hostname
441        self._use_killall = str(use_killall).lower() == 'true'
442        try:
443            # A test interface can only be found if an ip address is specified.
444            # A fully qualified hostname will return None for the
445            # test_interface.
446            self.test_interface = test_interface if test_interface else utils.get_interface_based_on_ip(
447                self._ssh_session, self.hostname)
448        except Exception as e:
449            self.log.warning(e)
450            self.test_interface = None
451
452    @property
453    def port(self):
454        return self._port
455
456    @property
457    def started(self):
458        return self._iperf_pid is not None
459
460    def _get_remote_log_path(self):
461        return '/tmp/iperf_server_port%s.log' % self.port
462
463    def get_interface_ip_addresses(self, interface):
464        """Gets all of the ip addresses, ipv4 and ipv6, associated with a
465           particular interface name.
466
467        Args:
468            interface: The interface name on the device, ie eth0
469
470        Returns:
471            A list of dictionaries of the various IP addresses. See
472            utils.get_interface_ip_addresses.
473        """
474        if not self._ssh_session:
475            self.start_ssh()
476
477        return utils.get_interface_ip_addresses(self._ssh_session, interface)
478
479    def renew_test_interface_ip_address(self):
480        """Renews the test interface's IPv4 address.
481
482        Necessary for changing DHCP scopes during a test.
483        """
484        if not self._ssh_session:
485            self.start_ssh()
486        utils.renew_linux_ip_address(self._ssh_session, self.test_interface)
487
488    def get_addr(self, addr_type='ipv4_private', timeout_sec=None):
489        """Wait until a type of IP address on the test interface is available
490        then return it.
491        """
492        if not self._ssh_session:
493            self.start_ssh()
494        return utils.get_addr(self._ssh_session, self.test_interface,
495                              addr_type, timeout_sec)
496
497    def _cleanup_iperf_port(self):
498        """Checks and kills zombie iperf servers occupying intended port."""
499        iperf_check_cmd = ('netstat -tulpn | grep LISTEN | grep iperf3'
500                           ' | grep :{}').format(self.port)
501        iperf_check = self._ssh_session.run(iperf_check_cmd,
502                                            ignore_status=True)
503        iperf_check = iperf_check.stdout
504        if iperf_check:
505            logging.debug('Killing zombie server on port {}'.format(self.port))
506            iperf_pid = iperf_check.split(' ')[-1].split('/')[0]
507            self._ssh_session.run('kill -9 {}'.format(str(iperf_pid)))
508
509    def start(self, extra_args='', tag='', iperf_binary=None):
510        """Starts iperf server on specified machine and port.
511
512        Args:
513            extra_args: A string representing extra arguments to start iperf
514                server with.
515            tag: Appended to log file name to identify logs from different
516                iperf runs.
517            iperf_binary: Location of iperf3 binary. If none, it is assumed the
518                the binary is in the path.
519        """
520        if self.started:
521            return
522
523        if not self._ssh_session:
524            self.start_ssh()
525        self._cleanup_iperf_port()
526        if not iperf_binary:
527            logging.debug('No iperf3 binary specified.  '
528                          'Assuming iperf3 is in the path.')
529            iperf_binary = 'iperf3'
530        else:
531            logging.debug('Using iperf3 binary located at %s' % iperf_binary)
532        iperf_command = '{} -s -J -p {}'.format(iperf_binary, self.port)
533
534        cmd = '{cmd} {extra_flags} > {log_file}'.format(
535            cmd=iperf_command,
536            extra_flags=extra_args,
537            log_file=self._get_remote_log_path())
538
539        job_result = self._ssh_session.run_async(cmd)
540        self._iperf_pid = job_result.stdout
541        self._current_tag = tag
542
543    def stop(self):
544        """Stops the iperf server.
545
546        Returns:
547            The name of the log file generated from the terminated session.
548        """
549        if not self.started:
550            return
551
552        if self._use_killall:
553            self._ssh_session.run('killall iperf3', ignore_status=True)
554        else:
555            self._ssh_session.run_async('kill -9 {}'.format(
556                str(self._iperf_pid)))
557
558        iperf_result = self._ssh_session.run('cat {}'.format(
559            self._get_remote_log_path()))
560
561        log_file = self._get_full_file_path(self._current_tag)
562        with open(log_file, 'w') as f:
563            f.write(iperf_result.stdout)
564
565        self._ssh_session.run_async('rm {}'.format(
566            self._get_remote_log_path()))
567        self._iperf_pid = None
568        return log_file
569
570    def start_ssh(self):
571        """Starts an ssh session to the iperf server."""
572        if not self._ssh_session:
573            self._ssh_session = connection.SshConnection(self.ssh_settings)
574
575    def close_ssh(self):
576        """Closes the ssh session to the iperf server, if one exists, preventing
577        connection reset errors when rebooting server device.
578        """
579        if self.started:
580            self.stop()
581        if self._ssh_session:
582            self._ssh_session.close()
583            self._ssh_session = None
584
585
586# TODO(markdr): Remove this after automagic controller creation has been
587# removed.
588class _AndroidDeviceBridge(object):
589    """A helper class for connecting serial numbers to AndroidDevices."""
590
591    _test_class = None
592
593    @staticmethod
594    @subscribe_static(TestClassBeginEvent)
595    def on_test_begin(event):
596        _AndroidDeviceBridge._test_class = event.test_class
597
598    @staticmethod
599    @subscribe_static(TestClassEndEvent)
600    def on_test_end(_):
601        _AndroidDeviceBridge._test_class = None
602
603    @staticmethod
604    def android_devices():
605        """A dict of serial -> AndroidDevice, where AndroidDevice is a device
606        found in the current TestClass's controllers.
607        """
608        if not _AndroidDeviceBridge._test_class:
609            return {}
610        return {
611            device.serial: device
612            for device in _AndroidDeviceBridge._test_class.android_devices
613        }
614
615
616event_bus.register_subscription(
617    _AndroidDeviceBridge.on_test_begin.subscription)
618event_bus.register_subscription(_AndroidDeviceBridge.on_test_end.subscription)
619
620
621class IPerfServerOverAdb(IPerfServerBase):
622    """Class that handles iperf3 operations over ADB devices."""
623
624    def __init__(self, android_device_or_serial, port):
625        """Creates a new IPerfServerOverAdb object.
626
627        Args:
628            android_device_or_serial: Either an AndroidDevice object, or the
629                serial that corresponds to the AndroidDevice. Note that the
630                serial must be present in an AndroidDevice entry in the ACTS
631                config.
632            port: The port number to open the iperf server on.
633        """
634        super().__init__(port)
635        self._android_device_or_serial = android_device_or_serial
636
637        self._iperf_process = None
638        self._current_tag = ''
639
640    @property
641    def port(self):
642        return self._port
643
644    @property
645    def started(self):
646        return self._iperf_process is not None
647
648    @property
649    def _android_device(self):
650        if isinstance(self._android_device_or_serial, AndroidDevice):
651            return self._android_device_or_serial
652        else:
653            return _AndroidDeviceBridge.android_devices()[
654                self._android_device_or_serial]
655
656    def _get_device_log_path(self):
657        return '~/data/iperf_server_port%s.log' % self.port
658
659    def start(self, extra_args='', tag='', iperf_binary=None):
660        """Starts iperf server on an ADB device.
661
662        Args:
663            extra_args: A string representing extra arguments to start iperf
664                server with.
665            tag: Appended to log file name to identify logs from different
666                iperf runs.
667            iperf_binary: Location of iperf3 binary. If none, it is assumed the
668                the binary is in the path.
669        """
670        if self._iperf_process is not None:
671            return
672
673        if not iperf_binary:
674            logging.debug('No iperf3 binary specified.  '
675                          'Assuming iperf3 is in the path.')
676            iperf_binary = 'iperf3'
677        else:
678            logging.debug('Using iperf3 binary located at %s' % iperf_binary)
679        iperf_command = '{} -s -J -p {}'.format(iperf_binary, self.port)
680
681        self._iperf_process = self._android_device.adb.shell_nb(
682            '{cmd} {extra_flags} > {log_file}'.format(
683                cmd=iperf_command,
684                extra_flags=extra_args,
685                log_file=self._get_device_log_path()))
686
687        self._iperf_process_adb_pid = ''
688        while len(self._iperf_process_adb_pid) == 0:
689            self._iperf_process_adb_pid = self._android_device.adb.shell(
690                'pgrep iperf3 -n')
691
692        self._current_tag = tag
693
694    def stop(self):
695        """Stops the iperf server.
696
697        Returns:
698            The name of the log file generated from the terminated session.
699        """
700        if self._iperf_process is None:
701            return
702
703        job.run('kill -9 {}'.format(self._iperf_process.pid))
704
705        # TODO(markdr): update with definitive kill method
706        while True:
707            iperf_process_list = self._android_device.adb.shell('pgrep iperf3')
708            if iperf_process_list.find(self._iperf_process_adb_pid) == -1:
709                break
710            else:
711                self._android_device.adb.shell("kill -9 {}".format(
712                    self._iperf_process_adb_pid))
713
714        iperf_result = self._android_device.adb.shell('cat {}'.format(
715            self._get_device_log_path()))
716
717        log_file = self._get_full_file_path(self._current_tag)
718        with open(log_file, 'w') as f:
719            f.write(iperf_result)
720
721        self._android_device.adb.shell('rm {}'.format(
722            self._get_device_log_path()))
723
724        self._iperf_process = None
725        return log_file
726