1#!/usr/bin/env python3 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17""" 18Class definition of B29 device for controlling the device. 19 20B29 is an engineering device with serial capabilities. It is almost like 21b20 except it has additional features that allow sending commands 22to b10 via one-wire and to pull logs from b10 via one-wire. 23 24Please see https://docs.google.com/document/d/17yJeJRNWxv5E9 25fBvw0sXkgwCBkshU_l4SxWkKgAxVmk/edit for details about available operations. 26""" 27 28import os 29import re 30import time 31from logging import Logger 32 33from acts import utils 34from acts.controllers.buds_lib import tako_trace_logger 35 36logging = tako_trace_logger.TakoTraceLogger(Logger(__file__)) 37DEVICE_REGEX = ( 38 r'_(?P<device_serial>[A-Z0-9]+)-(?P<interface>\w+)\s->\s' 39 r'(\.\./){2}(?P<port>\w+)' 40) 41# TODO: automate getting the latest version from x20 42DEBUG_BRIDGE = ('/google/data/ro/teams/wearables/apollo/ota/jenkins-presubmit/' 43 'ovyalov/master/apollo-sw/CL14060_v2-build13686/v13686/' 44 'automation/apollo_debug_bridge/linux2/apollo_debug_bridge') 45B29_CHIP = 'Cypress_Semiconductor_USBUART' 46 47 48# TODO: 49# as the need arises, additional functionalities of debug_bridge should be 50# integrated 51# TODO: 52# https://docs.google.com/document/d/17yJeJRNWxv5E9fBvw0sXkgwCBkshU_ 53# l4SxWkKgAxVmk/edit 54 55class B29Error(Exception): 56 """Module Level Error.""" 57 58 59def get_b29_devices(): 60 """ Get all available B29 devices. 61 62 Returns: 63 (list) A list of available devices (ex: ['/dev/ttyACM4',...]) or empty 64 list if none found 65 """ 66 devices = [] 67 result = os.popen('ls -l /dev/serial/by-id/*%s*' % B29_CHIP).read() 68 for line in result.splitlines(): 69 match = re.search(DEVICE_REGEX, line) 70 device_serial = match.group('device_serial') 71 log_port = None 72 commander_port = '/dev/' + match.group('port') 73 device = { 74 'commander_port': commander_port, 75 'log_port': log_port, 76 'serial_number': device_serial 77 } 78 devices.append(device) 79 return devices 80 81 82class B29Device(object): 83 """Class to control B29 device.""" 84 85 def __init__(self, b29_serial): 86 """ Class to control B29 device 87 Args: String type of serial number (ex: 'D96045152F121B00' 88 """ 89 self.serial = b29_serial 90 b29_port = [d['commander_port'] for d in get_b29_devices() if 91 d['serial_number'] == b29_serial] 92 if not b29_port: 93 logging.error("unable to find b29 with serial number %s" % 94 b29_serial) 95 raise B29Error( 96 "Recovery failed because b29_serial specified in device " 97 "manifest file is not found or invalid") 98 self.port = b29_port[0] 99 self.ping_match = {'psoc': r'Pings: tx=[\d]* rx=[1-9][0-9]', 100 'csr': r'count=100, sent=[\d]*, received=[1-9][0-9]', 101 'charger': r'Pings: tx=[\d]* rx=[1-9][0-9]'} 102 self.fw_version = self._get_version('fw') 103 self.app_version = self._get_version('app') 104 105 def _get_version(self, type='fw'): 106 """ Method to get version of B29 107 Returns: 108 String version if found (ex: '0006'), None otherwise 109 """ 110 command = '--serial={}'.format(self.port) 111 debug_bridge_process = self._send_command(command=command) 112 if type == 'fw': 113 version_match = re.compile(r'CHARGER app version: version=([\d]*)') 114 elif type == 'app': 115 version_match = re.compile(r'APP VERSION: ([\d]*)') 116 version_str = self._parse_output_of_running_process( 117 debug_bridge_process, version_match) 118 debug_bridge_process.kill() 119 if version_str: 120 match = version_match.search(version_str) 121 version = match.groups()[0] 122 return version 123 return None 124 125 def _parse_output_of_running_process(self, subprocess, match, timeout=30): 126 """ Parses the logs from subprocess objects and checks to see if a 127 match is found within the allotted time 128 Args: 129 subprocess: object returned by _send_command (which is the same as 130 bject returned by subprocess.Popen()) match: regex match object 131 (what is returned by re.compile(r'<regex>') timeout: int - time to 132 keep retrying before bailing 133 134 """ 135 start_time = time.time() 136 success_match = re.compile(match) 137 while start_time + timeout > time.time(): 138 out = subprocess.stderr.readline() 139 if success_match.search(out): 140 return out 141 time.sleep(.5) 142 return False 143 144 def _send_command(self, command): 145 """ Send command to b29 using apollo debug bridge 146 Args: 147 command: The command for apollo debug to execute 148 Returns: 149 subprocess object 150 """ 151 return utils.start_standing_subprocess( 152 '{} {} {}'.format(DEBUG_BRIDGE, '--rpc_port=-1', command), 153 shell=True) 154 155 def restore_golden_image(self): 156 """ Start a subprocess that calls the debug-bridge executable with 157 options that restores golden image of b10 attached to the b29. The 158 recovery restores the 'golden image' which is available in b10 partition 159 8. The process runs for 120 seconds which is adequate time for the 160 recovery to have completed. 161 """ 162 # TODO: 163 # because we are accessing x20, we need to capture error resulting from 164 # expired prodaccess and report it explicitly 165 # TODO: 166 # possibly file not found error? 167 168 # start the process, wait for two minutes and kill it 169 logging.info('Restoring golden image...') 170 command = '--serial=%s --debug_spi=dfu --sqif_partition=8' % self.port 171 debug_bridge_process = self._send_command(command=command) 172 success_match = re.compile('DFU on partition #8 successfully initiated') 173 if self._parse_output_of_running_process(debug_bridge_process, 174 success_match): 175 logging.info('Golden image restored successfully') 176 debug_bridge_process.kill() 177 return True 178 logging.warning('Failed to restore golden image') 179 debug_bridge_process.kill() 180 return False 181 182 def ping_component(self, component, timeout=30): 183 """ Send ping to the specified component via B290 184 Args: 185 component = 'csr' or 'psoc' or 'charger' 186 Returns: 187 True if successful and False otherwise 188 """ 189 if component not in ('csr', 'psoc', 'charger'): 190 raise B29Error('specified parameter for component is not valid') 191 logging.info('Pinging %s via B29...' % component) 192 command = '--serial={} --ping={}'.format(self.port, component) 193 debug_bridge_process = self._send_command(command=command) 194 if self._parse_output_of_running_process(debug_bridge_process, 195 self.ping_match[component], 196 timeout): 197 logging.info('Ping passes') 198 debug_bridge_process.kill() 199 return True 200 else: 201 logging.warning('Ping failed') 202 debug_bridge_process.kill() 203 return False 204 205 def reset_charger(self): 206 """ Send reset command to B29 207 Raises: TimeoutError (lib.utils.TimeoutError) if the device does not 208 come back within 120 seconds 209 """ 210 # --charger_reset 211 if int(self.fw_version) >= 6: 212 logging.info('Resetting B29') 213 command = '--serial={} --charger_reset'.format(self.port) 214 reset_charger_process = self._send_command(command=command) 215 time.sleep(2) 216 reset_charger_process.kill() 217 logging.info('Waiting for B29 to become available..') 218 utils.wait_until(lambda: self.ping_component('charger'), 120) 219 else: 220 logging.warning('B20 firmware version %s does not support ' 221 'charger_reset argument' % self.fw_version) 222