1#!/usr/bin/env python3
2
3#   Copyright 2016- The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16"""A helper module to communicate over telnet with AttenuatorInstruments.
17
18User code shouldn't need to directly access this class.
19"""
20
21import logging
22import telnetlib
23import re
24from acts.controllers import attenuator
25from acts.libs.proc import job
26
27
28def _ascii_string(uc_string):
29    return str(uc_string).encode('ASCII')
30
31
32class _TNHelper(object):
33    """An internal helper class for Telnet+SCPI command-based instruments.
34
35    It should only be used by those implementation control libraries and not by
36    any user code directly.
37    """
38    def __init__(self,
39                 tx_cmd_separator='\n',
40                 rx_cmd_separator='\n',
41                 prompt=''):
42        self._tn = None
43        self._ip_address = None
44        self._port = None
45
46        self.tx_cmd_separator = tx_cmd_separator
47        self.rx_cmd_separator = rx_cmd_separator
48        self.prompt = prompt
49
50    def open(self, host, port=23):
51        self._ip_address = host
52        self._port = port
53        if self._tn:
54            self._tn.close()
55        logging.debug("Telnet Server IP = %s" % host)
56        self._tn = telnetlib.Telnet()
57        self._tn.open(host, port, 10)
58
59    def is_open(self):
60        return bool(self._tn)
61
62    def close(self):
63        if self._tn:
64            self._tn.close()
65            self._tn = None
66
67    def diagnose_telnet(self):
68        """Function that diagnoses telnet connections.
69
70        This function diagnoses telnet connections and can be used in case of
71        command failures. The function checks if the devices is still reachable
72        via ping, and whether or not it can close and reopen the telnet
73        connection.
74
75        Returns:
76            False when telnet server is unreachable or unresponsive
77            True when telnet server is reachable and telnet connection has been
78            successfully reopened
79        """
80        logging.debug('Diagnosing telnet connection')
81        try:
82            job_result = job.run('ping {} -c 5 -i 0.2'.format(
83                self._ip_address))
84        except:
85            logging.error("Unable to ping telnet server.")
86            return False
87        ping_output = job_result.stdout
88        if not re.search(r' 0% packet loss', ping_output):
89            logging.error('Ping Packets Lost. Result: {}'.format(ping_output))
90            return False
91        try:
92            self.close()
93        except:
94            logging.error('Cannot close telnet connection.')
95            return False
96        try:
97            self.open(self._ip_address, self._port)
98        except:
99            logging.error('Cannot reopen telnet connection.')
100            return False
101        logging.debug('Telnet connection likely recovered')
102        return True
103
104    def cmd(self, cmd_str, wait_ret=True, retry=False):
105        if not isinstance(cmd_str, str):
106            raise TypeError('Invalid command string', cmd_str)
107
108        if not self.is_open():
109            raise attenuator.InvalidOperationError(
110                'Telnet connection not open for commands')
111
112        cmd_str.strip(self.tx_cmd_separator)
113        self._tn.read_until(_ascii_string(self.prompt), 2)
114        self._tn.write(_ascii_string(cmd_str + self.tx_cmd_separator))
115
116        if wait_ret is False:
117            return None
118
119        match_idx, match_val, ret_text = self._tn.expect(
120            [_ascii_string('\S+' + self.rx_cmd_separator)], 1)
121
122        logging.debug('Telnet Command: {}'.format(cmd_str))
123        logging.debug('Telnet Reply: ({},{},{})'.format(
124            match_idx, match_val, ret_text))
125
126        if match_idx == -1:
127            telnet_recovered = self.diagnose_telnet()
128            if telnet_recovered and retry:
129                logging.debug('Retrying telnet command once.')
130                return self.cmd(cmd_str, wait_ret, retry=False)
131            else:
132                raise attenuator.InvalidDataError(
133                    'Telnet command failed to return valid data')
134
135        ret_text = ret_text.decode()
136        ret_text = ret_text.strip(self.tx_cmd_separator +
137                                  self.rx_cmd_separator + self.prompt)
138
139        return ret_text
140