1#!/usr/bin/env python3 2 3# Copyright 2016- The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16"""A helper module to communicate over telnet with AttenuatorInstruments. 17 18User code shouldn't need to directly access this class. 19""" 20 21import logging 22import telnetlib 23import re 24from acts.controllers import attenuator 25from acts.libs.proc import job 26 27 28def _ascii_string(uc_string): 29 return str(uc_string).encode('ASCII') 30 31 32class _TNHelper(object): 33 """An internal helper class for Telnet+SCPI command-based instruments. 34 35 It should only be used by those implementation control libraries and not by 36 any user code directly. 37 """ 38 def __init__(self, 39 tx_cmd_separator='\n', 40 rx_cmd_separator='\n', 41 prompt=''): 42 self._tn = None 43 self._ip_address = None 44 self._port = None 45 46 self.tx_cmd_separator = tx_cmd_separator 47 self.rx_cmd_separator = rx_cmd_separator 48 self.prompt = prompt 49 50 def open(self, host, port=23): 51 self._ip_address = host 52 self._port = port 53 if self._tn: 54 self._tn.close() 55 logging.debug("Telnet Server IP = %s" % host) 56 self._tn = telnetlib.Telnet() 57 self._tn.open(host, port, 10) 58 59 def is_open(self): 60 return bool(self._tn) 61 62 def close(self): 63 if self._tn: 64 self._tn.close() 65 self._tn = None 66 67 def diagnose_telnet(self): 68 """Function that diagnoses telnet connections. 69 70 This function diagnoses telnet connections and can be used in case of 71 command failures. The function checks if the devices is still reachable 72 via ping, and whether or not it can close and reopen the telnet 73 connection. 74 75 Returns: 76 False when telnet server is unreachable or unresponsive 77 True when telnet server is reachable and telnet connection has been 78 successfully reopened 79 """ 80 logging.debug('Diagnosing telnet connection') 81 try: 82 job_result = job.run('ping {} -c 5 -i 0.2'.format( 83 self._ip_address)) 84 except: 85 logging.error("Unable to ping telnet server.") 86 return False 87 ping_output = job_result.stdout 88 if not re.search(r' 0% packet loss', ping_output): 89 logging.error('Ping Packets Lost. Result: {}'.format(ping_output)) 90 return False 91 try: 92 self.close() 93 except: 94 logging.error('Cannot close telnet connection.') 95 return False 96 try: 97 self.open(self._ip_address, self._port) 98 except: 99 logging.error('Cannot reopen telnet connection.') 100 return False 101 logging.debug('Telnet connection likely recovered') 102 return True 103 104 def cmd(self, cmd_str, wait_ret=True, retry=False): 105 if not isinstance(cmd_str, str): 106 raise TypeError('Invalid command string', cmd_str) 107 108 if not self.is_open(): 109 raise attenuator.InvalidOperationError( 110 'Telnet connection not open for commands') 111 112 cmd_str.strip(self.tx_cmd_separator) 113 self._tn.read_until(_ascii_string(self.prompt), 2) 114 self._tn.write(_ascii_string(cmd_str + self.tx_cmd_separator)) 115 116 if wait_ret is False: 117 return None 118 119 match_idx, match_val, ret_text = self._tn.expect( 120 [_ascii_string('\S+' + self.rx_cmd_separator)], 1) 121 122 logging.debug('Telnet Command: {}'.format(cmd_str)) 123 logging.debug('Telnet Reply: ({},{},{})'.format( 124 match_idx, match_val, ret_text)) 125 126 if match_idx == -1: 127 telnet_recovered = self.diagnose_telnet() 128 if telnet_recovered and retry: 129 logging.debug('Retrying telnet command once.') 130 return self.cmd(cmd_str, wait_ret, retry=False) 131 else: 132 raise attenuator.InvalidDataError( 133 'Telnet command failed to return valid data') 134 135 ret_text = ret_text.decode() 136 ret_text = ret_text.strip(self.tx_cmd_separator + 137 self.rx_cmd_separator + self.prompt) 138 139 return ret_text 140