1# Copyright 2020 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Stress test utility for repeating actions repeatedly on android devices. 16 17Configures multiple devices to simultaneously run through the same set of 18actions over and over, while keeping logs from various sources. Primarily 19designed for playing audio to the devices and scanning their log output for 20events, while running other adb commands in between. 21""" 22from __future__ import absolute_import 23from __future__ import division 24from __future__ import print_function 25 26import datetime 27from email import encoders 28from email.mime import text 29import email.mime.base as base 30import email.mime.multipart as multipart 31import logging 32import mimetypes 33import os 34import platform 35import re 36import shlex 37import signal 38import smtplib 39import socket 40import subprocess 41import sys 42import tempfile 43import threading 44import time 45import uuid 46import wave 47from absl import app 48from absl import flags 49import pexpect 50import queue 51import stress_test_common 52import stress_test_pb2 53from google.protobuf import text_format 54 55_SUMMARY_LINES = "-" * 73 56 57if sys.platform.startswith("win"): 58 pexpect = None 59 60_SUMMARY_COLUMNS = ( 61 "| Event Type | Event Count | Consecutive no event |") 62_SUMMARY_COL_FORMATT = "|%-25.25s|% 22d|% 22d|" 63 64FLAGS = flags.FLAGS 65flags.DEFINE_string("notification_address", "", 66 "Email address where to send notification events. Will " 67 "default to $USER@google.com if not provided. No emails " 68 "will be sent if suppress_notification_emails is True.") 69flags.DEFINE_bool("suppress_notification_emails", False, 70 "Prevents emails from being sent as notifications if True.") 71flags.DEFINE_string("test_name", None, 72 "Name of stress test to run. For example, if you set this " 73 "to 'dsp_trigger_sw_rejection', the stress test in " 74 "'stress_test.dsp_trigger_sw_rejection.ascii_proto' will " 75 "be loaded and executed.") 76# flags.mark_flag_as_required("test_name") 77flags.DEFINE_string("output_root", "./", 78 "Path where directory should be generated containing all " 79 "logs from devices and moved files.") 80flags.DEFINE_integer("num_iterations", None, 81 "If set to a positive number, the number of iterations of " 82 "the stress test to run. Otherwise, the test runs " 83 "forever.") 84flags.DEFINE_list("devices", [], 85 "Serial numbers of devices that should be included in the " 86 "stress test. If empty, all devices will be used.") 87flags.DEFINE_integer("print_summary_every_n", 10, 88 "Prints the summary to the log file every n iterations.") 89 90flags.DEFINE_string("email_sender_address", "", 91 "Account to use for sending notification emails.") 92flags.DEFINE_string("email_sender_password", "", 93 "Password to use for notification email account.") 94flags.DEFINE_string("email_smtp_server", "smtp.gmail.com", 95 "SMTP server to use for sending notification emails.") 96flags.DEFINE_integer("email_smtp_port", 465, 97 "Port to use for the notification SMTP server.") 98flags.DEFINE_integer("device_settle_time", 5, 99 "Time to wait for devices to settle.") 100flags.DEFINE_bool("use_sox", platform.system() != "Windows", 101 "Use sox for playback, otherwise, attempt to use platform " 102 "specific features.") 103flags.DEFINE_bool("attach_bugreport", True, 104 "Attach bugreport to email if test failed.") 105flags.DEFINE_bool("delete_data_dir", False, 106 "If true, code will delete all the files generated by this " 107 "test at the end.") 108 109if platform.system().startswith("CYGWIN"): 110 FLAGS.device_settle_time = 30 111 112 113def QueueWorker(worker_queue): 114 while True: 115 work = worker_queue.get() 116 try: 117 work() 118 except: # pylint:disable=bare-except 119 logging.exception("Exception in worker queue - task remains uncompleted.") 120 worker_queue.task_done() 121 122 123def SendNotificationEmail(subject, body, bugreport=None): 124 """Sends an email with the specified subject and body. 125 126 Also attach bugreport if bugreport location is provided as argument 127 128 Args: 129 subject: Subject of the email. 130 body: Body of the email. 131 bugreport: If provided, it will be attach to the email. 132 """ 133 if FLAGS.suppress_notification_emails: 134 logging.info("Email with subject '%s' has been suppressed", subject) 135 return 136 try: 137 # Assemble the message to send. 138 recpient_address = FLAGS.notification_address 139 message = multipart.MIMEMultipart("alternative") 140 message["From"] = "Stress Test on %s" % socket.gethostname() 141 message["To"] = recpient_address 142 message["Subject"] = subject 143 message.attach(text.MIMEText(body, "plain")) 144 message.attach(text.MIMEText("<pre>%s</pre>" % body, "html")) 145 146 if FLAGS.attach_bugreport and bugreport: 147 # buildozer: disable=unused-variable 148 ctype, _ = mimetypes.guess_type(bugreport) 149 maintype, subtype = ctype.split("/", 1) 150 with open(bugreport, "rb") as fp: 151 att = base.MIMEBase(maintype, subtype) 152 att.set_payload(fp.read()) 153 encoders.encode_base64(att) 154 att.add_header("Content-Disposition", "attachment", filename=bugreport) 155 message.attach(att) 156 157 # Send the message from our special account. 158 server = smtplib.SMTP_SSL(FLAGS.email_smtp_server, FLAGS.email_smtp_port) 159 server.login(FLAGS.email_sender_address, FLAGS.email_sender_password) 160 server.sendmail(FLAGS.email_sender_address, recpient_address, 161 message.as_string()) 162 server.quit() 163 logging.info("Email with subject '%s' has been sent", subject) 164 except: # pylint:disable=bare-except 165 logging.exception("Failed to send notification email") 166 167 168class ProcessLogger(threading.Thread): 169 170 class EventScanner(object): 171 172 def __init__(self, name, process_name, regexes): 173 """Struct to store the data about an event. 174 175 Args: 176 name: Name of event. 177 process_name: Name of the process that is being logged. 178 regexes: An iteratable of regex strings that indicate an event has 179 happened. 180 """ 181 182 self.name = name 183 self.process_name = process_name 184 self.searches = [re.compile(regex).search for regex in regexes] 185 self.count = 0 186 187 def ScanForEvent(self, line, lock=None): 188 """Checks the line for matches. If found, updates the internal counter.""" 189 190 for search in self.searches: 191 if search(line.decode("utf-8")): 192 # Grab the lock (if provided), update the counter, and release it. 193 if lock: lock.acquire() 194 self.count += 1 195 if lock: lock.release() 196 logging.info("Event '%s' detected on %s", self.name, 197 self.process_name) 198 199 def __init__(self, name, command, output, events, 200 restart_process, repeats_output_when_opened): 201 """Threaded class that monitors processes for events, and logs output. 202 203 Args: 204 name: The name of the process being logged. 205 command: A list of arguments to be passed to the subprocess to execute. 206 output: Name of output file to write process stdout to. If blank or None, 207 will not be generated. 208 events: An iterable of LoggingEventConfigs to look for in the output. 209 restart_process: Restart the process if it terminates by itself. This 210 should typically be true, but false for processes that only should be 211 run once and have their output logged. 212 repeats_output_when_opened: Set to true if the process will repeat the 213 output of a previous call when it is restarted. This will prevent 214 duplicate lines from being logged. 215 """ 216 super(ProcessLogger, self).__init__() 217 self.name = name 218 self.command = command 219 self.restart_process = restart_process 220 self.repeats_output_when_opened = repeats_output_when_opened 221 self.process = None 222 self.lock = threading.Lock() 223 self.looking = False 224 225 # Compile the list of regexes that we're supposed to be looking for. 226 self.events = [] 227 for event in events: 228 self.events.append(ProcessLogger.EventScanner(event.name, self.name, 229 event.regex)) 230 231 if output: 232 stress_test_common.MakeDirsIfNeeded(os.path.dirname(output)) 233 self.output_fp = open(output, "w", encoding="utf-8") 234 logging.info("Logging device info to %s", output) 235 else: 236 self.output_fp = None 237 238 def GetEventCountsSinceLastCall(self): 239 """Returns the counts of all events since this method was last called.""" 240 event_map = {} 241 self.lock.acquire() 242 for event in self.events: 243 event_map[event.name] = event.count 244 event.count = 0 245 self.lock.release() 246 return event_map 247 248 def run(self): 249 last_line = None 250 should_log = True 251 first_run = True 252 skip_exception_line = False 253 self.lock.acquire() 254 last_run_time = 0 255 while self.restart_process: 256 self.lock.release() 257 if not first_run: 258 logging.info("Restarting process %s", "".join(str(self.command))) 259 time_since_last_run = datetime.datetime.now() - last_run_time 260 if time_since_last_run.total_seconds() < 1.0: 261 needed_delay = 1.0 - time_since_last_run.total_seconds() 262 logging.info("Delaying for %.2f seconds", needed_delay) 263 time.sleep(needed_delay) 264 else: 265 first_run = False 266 267 try: 268 if pexpect: 269 self.process = pexpect.spawn(" ".join(self.command), timeout=None) 270 output_source = self.process 271 else: 272 self.process = subprocess.Popen(self.command, stdout=subprocess.PIPE) 273 output_source = self.process.stdout 274 last_run_time = datetime.datetime.now() 275 for line in output_source: 276 # If the process we're logging likes to repeat its output, we need to 277 # look for the last line we saw before we start doing anything with 278 # these lines anymore. 279 if self.repeats_output_when_opened: 280 if not should_log: 281 if last_line == line: 282 should_log = True 283 continue 284 elif skip_exception_line: 285 # ignore the last line which caused UnicodeEncodeError 286 skip_exception_line = False 287 continue 288 289 if self.output_fp: 290 self.output_fp.write(line.decode("utf-8", "backslashreplace").rstrip()) 291 self.output_fp.write("\n") 292 293 # Loop through all events we're watching for, to see if they occur on 294 # this line. If they do, update the fact that we've seen this event. 295 for event in self.events: 296 if self.looking: 297 event.ScanForEvent(line, lock=self.lock) 298 last_line = line 299 except UnicodeEncodeError: 300 logging.exception("UnicodeEncodeError on running logger process") 301 skip_exception_line = True 302 except: # pylint:disable=bare-except 303 logging.exception("Exception encountered running process") 304 finally: 305 if pexpect: 306 self.process.terminate() 307 else: 308 self.process.send_signal(signal.SIGTERM) 309 should_log = False 310 self.lock.acquire() 311 312 self.lock.release() 313 if pexpect: 314 if self.process.exitstatus is not None: 315 logging.info("Process finished - exit code %d", self.process.exitstatus) 316 else: 317 logging.info("Process finished - signal code %d", 318 self.process.signalstatus) 319 else: 320 if self.process.returncode is not None: 321 logging.info("Process finished - return code %d", 322 self.process.returncode) 323 else: 324 logging.info("Process finished - no return code") 325 326 def StopLogging(self): 327 if self.process: 328 self.lock.acquire() 329 self.restart_process = False 330 self.lock.release() 331 332 if pexpect: 333 self.process.kill(signal.SIGHUP) 334 self.process.kill(signal.SIGINT) 335 else: 336 self.process.send_signal(signal.SIGTERM) 337 338 339class Device(object): 340 341 SECONDS_TO_SLEEP_DURING_ROOT = 0.5 342 343 def __init__(self, serial_number, output_root, test_events, expected_result): 344 """Responsible for monitoring a specific device, and pulling files from it. 345 346 The actual work of the constructor will be handled asynchronously, you must 347 call WaitForTasks() before using the device. 348 349 Args: 350 serial_number: The device serial number. 351 output_root: The directory where to output log files/anything pulled from 352 the device. 353 test_events: The events (with conditions) that come from the StressTest 354 that should be evaluated at every iteration, along with a list of 355 actions to take when one of these events occur. For example, if there 356 have not been any detected hotword triggers, a bugreport can be 357 generated. 358 expected_result: Expected event count to pass the test. 359 """ 360 self.serial_number = serial_number 361 self.output_root = output_root 362 self.cmd_string_replacements = {} 363 self.iteration = 0 364 self.cmd_string_replacements["iteration"] = 0 365 self.cmd_string_replacements["serial_number"] = serial_number 366 self.cmd_string_replacements["output_root"] = output_root 367 self.name = None 368 self.process_loggers = [] 369 self.event_log = stress_test_pb2.EventLog() 370 self.cnt_per_iteration = expected_result 371 372 # Prepare the work queue, and offload the rest of the init into it. 373 self.work_queue = queue.Queue() 374 self.worker = threading.Thread(target=QueueWorker, args=[self.work_queue]) 375 self.worker.daemon = True 376 self.worker.name = self.name 377 self.worker.start() 378 self.abort_requested = False 379 self.remove_device = False 380 self.test_events = test_events 381 382 self.work_queue.put(self.__init_async__) 383 384 def __init_async__(self): 385 # Get the device type, and append it to the serial number. 386 self.device_type = self.Command(["shell", "getprop", 387 "ro.product.name"]).strip().decode("utf-8") 388 self.name = "%s_%s" % (self.device_type, self.serial_number) 389 self.worker.name = self.name 390 self.cmd_string_replacements["device"] = self.name 391 logging.info("Setting up device %s", self.name) 392 393 config = stress_test_common.LoadDeviceConfig(self.device_type, 394 self.serial_number) 395 396 # Get the device ready. 397 self.Root() 398 399 # Run any setup commands. 400 for cmd in config.setup_command: 401 result = self.Command( 402 shlex.split(cmd % self.cmd_string_replacements)).strip() 403 if result: 404 for line in result.splitlines(): 405 logging.info(line) 406 407 self.files_to_move = config.file_to_move 408 409 self.event_names = set([event.name for event in config.event]) 410 self.event_counter = {name: 0 for name in self.event_names} 411 self.iterations_since_event = {name: 0 for name in self.event_names} 412 413 for file_to_watch in config.file_to_watch: 414 # Are there any events that match up with this file? 415 events = [x for x in config.event if x.source == file_to_watch.source] 416 417 if file_to_watch.source == "LOGCAT": 418 command = [ 419 "adb", "-s", self.serial_number, "logcat", "-v", "usec", "" 420 ] 421 command.extend(["%s:S" % tag for tag in config.tag_to_suppress]) 422 name = "logcat_" + self.serial_number 423 else: 424 command = [ 425 "adb", "-s", self.serial_number, "shell", 426 "while : ; do cat %s 2>&1; done" % file_to_watch.source 427 ] 428 name = "%s_%s" % (os.path.basename( 429 file_to_watch.source), self.serial_number) 430 431 process_logger = ProcessLogger( 432 name, command, os.path.join( 433 self.output_root, 434 file_to_watch.destination % self.cmd_string_replacements), 435 events, True, file_to_watch.repeats_output_on_open) 436 self.process_loggers.append(process_logger) 437 process_logger.start() 438 439 # Add any of the background processes. 440 for daemon_process in config.daemon_process: 441 # Are there any events that match up with this file? 442 events = [x for x in config.event if x.source == daemon_process.name] 443 command = shlex.split( 444 daemon_process.command % self.cmd_string_replacements) 445 if daemon_process.destination: 446 output = os.path.join( 447 self.output_root, 448 daemon_process.destination % self.cmd_string_replacements) 449 else: 450 output = None 451 name = "%s_%s" % (daemon_process.name, self.serial_number) 452 process_logger = ProcessLogger(name, command, output, events, 453 daemon_process.restart, 454 daemon_process.repeats_output_on_open) 455 self.process_loggers.append(process_logger) 456 process_logger.start() 457 458 # Build up the list of events we can actually process. 459 self.__UpdateEventCounters(number_of_iterations=0) 460 test_events = self.test_events 461 self.test_events = [] 462 for event in test_events: 463 try: 464 eval(event.condition, # pylint:disable=eval-used 465 {"__builtins__": None}, self.__ValuesInEval()) 466 self.test_events.append(event) 467 except Exception as err: # pylint:disable=broad-except 468 logging.error("Test event %s is not compatible with %s", event.name, 469 self.name) 470 logging.error(str(err)) 471 # Make sure that device specific events don't have conditions. 472 self.device_events = [] 473 for event in config.test_event: 474 if not event.name: 475 logging.error("Device %s test event is missing a name", self.name) 476 continue 477 if event.condition: 478 self.test_events.append(event) 479 else: 480 self.device_events.append(event) 481 482 def StartLookingForEvents(self): 483 """Starts all child ProcessLoggers to start looking for events.""" 484 for process_logger in self.process_loggers: 485 process_logger.looking = True 486 487 def __ValuesInEval(self): 488 values_in_eval = {key: value for key, value 489 in list(self.event_counter.items())} 490 for key, value in list(self.iterations_since_event.items()): 491 values_in_eval["iterations_since_%s" % key] = value 492 return values_in_eval 493 494 def __GetExpectedEventCount(self, event): 495 if event == "logcat_iteration": 496 return -1 497 try: 498 event_cnt = getattr(self.cnt_per_iteration, event) 499 except AttributeError: 500 event_cnt = -1 501 logging.exception("%s is not an attribute of expected_result", event) 502 return event_cnt 503 504 def __UpdateEventCounters(self, number_of_iterations=1): 505 # Update the event counters 506 visited_events = set() 507 error_log = [] 508 for process_logger in self.process_loggers: 509 events = process_logger.GetEventCountsSinceLastCall() 510 for event, count in list(events.items()): 511 # Print log when there is any missed event 512 expected_count = self.__GetExpectedEventCount(event) 513 514 if expected_count > 0: 515 if count > expected_count * number_of_iterations: 516 logging.info( 517 "[STRESS_TEST] In iteration %d, got duplicated %s : %d", 518 self.iteration, event, count) 519 logging.info("[STRESS_TEST] Will count only : %d", 520 expected_count * number_of_iterations) 521 count = expected_count * number_of_iterations 522 523 if count: 524 self.event_counter[event] += count 525 visited_events.add(event) 526 527 if expected_count >= 0: 528 if expected_count * number_of_iterations != count: 529 error_log.append( 530 _SUMMARY_COL_FORMATT % 531 (event, count, expected_count * number_of_iterations)) 532 533 # Go clear all the events that weren't consecutive. 534 for event in self.iterations_since_event: 535 if event in visited_events: 536 self.iterations_since_event[event] = 0 537 else: 538 self.iterations_since_event[event] += number_of_iterations 539 540 if error_log: 541 logging.info(_SUMMARY_LINES) 542 logging.info(" iteration %d : Something wrong in %s.", 543 self.iteration, self.name) 544 logging.info(_SUMMARY_LINES) 545 logging.info(_SUMMARY_COLUMNS) 546 logging.info(_SUMMARY_LINES) 547 for line in error_log: 548 logging.info(line) 549 logging.info(_SUMMARY_LINES) 550 551 def ProcessEvents(self): 552 """Updates the event_counter and iterations_since_event maps.""" 553 self.work_queue.put(self.__ProcessEventsAsync) 554 555 def __ProcessEventsAsync(self): 556 # Move any files to the local machine that should be moved. 557 if self.files_to_move: 558 for file_to_move in self.files_to_move: 559 try: 560 self.Command(["pull", file_to_move.source, file_to_move.destination]) 561 except: # pylint:disable=bare-except 562 logging.exception("Failed to pull %s", file_to_move.source) 563 564 self.__UpdateEventCounters() 565 566 for event in self.test_events: 567 if eval(event.condition, # pylint:disable=eval-used 568 {"__builtins__": None}, self.__ValuesInEval()): 569 logging.info("Condition has been met for event '%s'", event.name) 570 # Write the updated event log. 571 event_log_details = self.event_log.event.add() 572 event_log_details.iteration = self.iteration 573 event_log_details.name = event.name 574 with open(os.path.join(self.output_root, 575 "%s_event_log.ascii_proto" % self.name), 576 "w") as fp: 577 text_format.PrintMessage(self.event_log, fp) 578 579 # Do whatever other actions that are part of the event. 580 self.__ProcessEventActionQueue(event) 581 582 # Run any device specific actions for this event. 583 for device_event in self.device_events: 584 if device_event.name == event.name: 585 self.__ProcessEventActionQueue(device_event) 586 587 # Set up the next iteration. 588 self.iteration += 1 589 self.cmd_string_replacements["iteration"] = self.iteration 590 591 def __ProcessEventActionQueue(self, event): 592 bugreport = None 593 for action in event.action: 594 if action == "BUGREPORT": 595 bugreport = self.TakeBugReport() 596 elif action.startswith("DUMPSYS "): 597 self.CaptureDumpsys(action[action.find(" ") + 1:]) 598 elif action == "NOTIFY": 599 SendNotificationEmail( 600 "%s had event '%s' occur" % (self.name, event.name), 601 "\n".join(["Current Summary:"] + self.GetSummaryLines()), bugreport) 602 elif action == "REMOVE_DEVICE": 603 logging.info("Removing %s from the test", self.serial_number) 604 self.remove_device = True 605 elif action == "ABORT": 606 logging.info("Abort requested") 607 self.abort_requested = True 608 else: 609 action %= self.cmd_string_replacements 610 logging.info("Running command %s on %s", action, self.name) 611 result = self.Command(shlex.split(action)).strip() 612 if result: 613 for line in result.splitlines(): 614 logging.info(line) 615 616 def Root(self): 617 self.Command(["root"]) 618 time.sleep(Device.SECONDS_TO_SLEEP_DURING_ROOT) 619 self.Command(["wait-for-device"]) 620 time.sleep(Device.SECONDS_TO_SLEEP_DURING_ROOT) 621 622 def Stop(self): 623 """Stops all file loggers attached to this device.""" 624 for process_logger in self.process_loggers: 625 process_logger.StopLogging() 626 self.process_loggers = [] 627 628 def Join(self): 629 for process_logger in self.process_loggers: 630 process_logger.join() 631 self.WaitForTasks() 632 633 def AsyncCommand(self, command, log_output=False): 634 self.work_queue.put( 635 lambda: self.__AsyncCommand(command, log_output=log_output)) 636 637 def __AsyncCommand(self, command, log_output=False): 638 result = self.Command(command).strip() 639 if result and log_output: 640 # log both logcat and stress testing log 641 # some test will depend on adb command output (ex: dumpsys) 642 self.Command(['shell', 'log', '-t', 'STRESS_TEST', result]) 643 for line in result.splitlines(): 644 logging.info(line.decode("utf-8")) 645 646 def Command(self, command): 647 """Runs the provided command on this device.""" 648 if command[0] in {"bugreport", "root", "wait-for-device", "shell", 649 "logcat"}: 650 return subprocess.check_output( 651 ["adb", "-s", self.serial_number] + command) 652 elif command[0] == "DUMPSYS": 653 self.CaptureDumpsys(command[1]) 654 return "" 655 elif command[0] == "pull": 656 try: 657 files = subprocess.check_output( 658 ["adb", "-s", self.serial_number, "shell", "ls", command[1]] 659 ).strip().splitlines() 660 except subprocess.CalledProcessError: 661 return "" 662 if len(files) == 1 and "No such file or directory" in files[0]: 663 return "" 664 for source_file in files: 665 destination = os.path.join(self.output_root, 666 command[2] % self.cmd_string_replacements) 667 stress_test_common.MakeDirsIfNeeded(os.path.dirname(destination)) 668 logging.info("Moving %s from %s to %s", source_file, self.name, 669 destination) 670 subprocess.check_output(["adb", "-s", self.serial_number, "pull", 671 source_file, destination]) 672 if FLAGS.delete_data_dir: 673 subprocess.check_output([ 674 "adb", "-s", self.serial_number, "shell", "rm", "-rf", source_file 675 ]) 676 return "" 677 else: 678 return subprocess.check_output(command) 679 680 def TakeBugReport(self): 681 logging.info("Capturing bugreport on %s", self.name) 682 bugreport = os.path.join(self.output_root, 683 "%s_bugreport_iteration_%06d.zip" % 684 (self.name, self.iteration)) 685 sdk = int(self.Command( 686 ["shell", "getprop", "ro.build.version.sdk"]).strip()) 687 if sdk >= 24: # SDK 24 = Android N 688 with open(bugreport, "wb") as bugreport_fp: 689 bugreport_fp.write(self.Command(["bugreport", bugreport])) 690 else: 691 bugreport_txt = os.path.join(self.output_root, 692 "%s_bugreport_iteration_%06d.txt" % 693 (self.name, self.iteration)) 694 with open(bugreport_txt, "wb") as bugreport_fp: 695 bugreport_fp.write(self.Command(["bugreport"])) 696 self.Command(["zip", bugreport, bugreport_txt]) 697 698 self.Command(["pull", "/data/anr/traces.txt", 699 "%s_traces_iteration_%06d.txt" % (self.name, self.iteration)]) 700 self.Command(["pull", "/data/anr/traces.txt.bugreport", 701 "%s_traces_iteration_%06d.txt.bugreport" % (self.name, 702 self.iteration)]) 703 return bugreport 704 705 def CaptureDumpsys(self, dumpsys_unit): 706 logging.info("Taking dumpsys %s on %s", dumpsys_unit, self.name) 707 stress_test_common.MakeDirsIfNeeded(os.path.join(self.output_root, 708 self.name)) 709 with open(os.path.join(self.output_root, self.name, 710 "%s_%06d.txt" % (dumpsys_unit, self.iteration)), 711 "w") as dumpsys_fp: 712 dumpsys_fp.write(self.Command(["shell", "dumpsys", dumpsys_unit])) 713 714 def WaitForTasks(self): 715 self.work_queue.join() 716 717 def GetSummaryLines(self): 718 lines = [ 719 "Device {}".format(self.name), 720 _SUMMARY_LINES, _SUMMARY_COLUMNS, _SUMMARY_LINES 721 ] 722 for event, count in sorted(self.event_counter.items()): 723 lines.append(_SUMMARY_COL_FORMATT % ( 724 event, count, self.iterations_since_event[event])) 725 lines.append(_SUMMARY_LINES) 726 return lines 727 728 729def RunAsyncCommand(devices, command): 730 """Helper function for running async commands on many devices.""" 731 for device in devices: 732 device.AsyncCommand(command) 733 for device in devices: 734 device.WaitForTasks() 735 736 737class StressTest(object): 738 """Manages dispatching commands to devices/playing audio and events.""" 739 740 def __init__(self, output_root, test_name): 741 self.output_root = output_root 742 self.devices = [] 743 self.test_name = test_name 744 config = stress_test_pb2.StressTestConfig() 745 config_contents = stress_test_common.GetResourceContents( 746 os.path.join(stress_test_common.RESOURCE_DIR, 747 "stress_test.%s.ascii_proto" % test_name)) 748 text_format.Merge(config_contents, config) 749 self.events = config.event 750 self.setup_commands = config.setup_command 751 self.steps = config.step 752 self.audio_tempfiles = {} 753 self.uuid = str(uuid.uuid4()) 754 self.expected_result = None 755 self.iteration = 0 756 if config.expected_result: 757 self.expected_result = config.expected_result[0] 758 759 # Place all the audio files into temp files. 760 for step in self.steps: 761 if step.audio_file and step.audio_file not in self.audio_tempfiles: 762 # We can't delete the temp file on windows, since it gets nuked too 763 # early. 764 audio_tempfile = tempfile.NamedTemporaryFile( 765 delete=(platform.system() != "Windows"), 766 dir="." if platform.system().startswith("CYGWIN") else None 767 ) 768 if platform.system().startswith("CYGWIN"): 769 audio_tempfile.name = os.path.basename(audio_tempfile.name) 770 self.audio_tempfiles[step.audio_file] = audio_tempfile 771 if FLAGS.use_sox: 772 # Write out the raw PCM samples as a wave file. 773 audio_tempfile.write( 774 stress_test_common.GetResourceContents(step.audio_file)) 775 else: 776 # Make a temporary wave file for playout if we can't use sox. 777 wavefile = wave.open(audio_tempfile, "wb") 778 if step.audio_file_sample_rate <= 0: 779 step.audio_file_sample_rate = 16000 780 wavefile.setframerate(step.audio_file_sample_rate) 781 if step.audio_file_num_channels <= 0: 782 step.audio_file_num_channels = 1 783 wavefile.setnchannels(step.audio_file_num_channels) 784 if not step.audio_file_format: 785 wavefile.setsampwidth(2) 786 elif step.audio_file_format == "s8": 787 wavefile.setsampwidth(1) 788 elif step.audio_file_format == "s16": 789 wavefile.setsampwidth(2) 790 elif step.audio_file_format == "s32": 791 wavefile.setsampwidth(4) 792 else: 793 raise RuntimeError( 794 "Unsupported wave file format for %s" % step.audio_file) 795 wavefile.writeframes(stress_test_common.GetResourceContents( 796 step.audio_file)) 797 wavefile.close() 798 audio_tempfile.flush() 799 800 if platform.system() == "Windows": 801 audio_tempfile.close() 802 803 # Create all the devices that are attached to this machine. 804 for serial_number in self.GetActiveSerialNumbers(): 805 self.devices.append( 806 Device(serial_number, output_root, self.events, self.expected_result)) 807 if not self.devices: 808 raise app.UsageError("No devices connected") 809 810 self.devices.sort(key=lambda x: x.name) 811 812 # Make sure every device is done with their work for setup. 813 for device in self.devices: 814 device.WaitForTasks() 815 816 # Write out the info meta-data proto. Useful for doing analysis of the logs 817 # after the stress test has completed. 818 stress_test_info = stress_test_pb2.StressTestInfo() 819 stress_test_info.test_name = self.test_name 820 stress_test_info.test_description = config.description 821 stress_test_info.uuid = self.uuid 822 for device in self.devices: 823 device_pb = stress_test_info.device.add() 824 device_pb.device_type = device.device_type 825 device_pb.serial_number = device.serial_number 826 827 text_format.PrintMessage(stress_test_info, open(os.path.join( 828 self.output_root, "stress_test_info.ascii_proto"), "w")) 829 830 def GetActiveSerialNumbers(self): 831 serial_numbers = [] 832 for line in sorted( 833 subprocess.check_output(["adb", "devices"]).splitlines()): 834 if line.endswith(b"device"): 835 serial_number = line.split()[0].strip() 836 if FLAGS.devices and serial_number not in FLAGS.devices: 837 continue 838 serial_numbers.append(serial_number.decode("utf-8")) 839 return serial_numbers 840 841 def Start(self): 842 logging.info("Waiting for devices to settle") 843 time.sleep(5) 844 # Make a copy of the device list, as we'll be modifying this actual list. 845 devices = list(self.devices) 846 dropped_devices = [] 847 848 # If we have any setup commands, run them. 849 for command in self.setup_commands: 850 logging.info("Running command %s", command) 851 # Can't use the async command helper function since we need to get at 852 # the device cmd_string_replacements. 853 for device in devices: 854 device.AsyncCommand( 855 shlex.split(command % device.cmd_string_replacements), 856 log_output=True) 857 for device in devices: 858 device.WaitForTasks() 859 860 for device in devices: 861 device.StartLookingForEvents() 862 device.AsyncCommand(["shell", "log", "-t", "STRESS_TEST", 863 "Starting {%s} TZ=$(getprop persist.sys.timezone) " 864 "YEAR=$(date +%%Y)" % self.uuid], True) 865 self.iteration = 0 866 while True: 867 logging.info("Starting iteration %d", self.iteration) 868 # Perform all the actions specified in the test. 869 RunAsyncCommand(devices, [ 870 "shell", "log", "-t", "STRESS_TEST", 871 "Performing iteration %d $(head -n 3 " 872 "/proc/timer_list | tail -n 1)" % self.iteration 873 ]) 874 875 for step in self.steps: 876 if step.delay_before: 877 logging.info("Waiting for %.2f seconds", step.delay_before) 878 time.sleep(step.delay_before) 879 880 if step.audio_file: 881 logging.info("Playing %s", step.audio_file) 882 RunAsyncCommand(devices, ["shell", "log", "-t", "STRESS_TEST", 883 "Playing %s" % step.audio_file]) 884 885 if FLAGS.use_sox: 886 subprocess.check_call(["sox", "-q", 887 self.audio_tempfiles[step.audio_file].name, 888 "-d"]) 889 elif platform.system() == "Windows": 890 import winsound # pylint:disable=g-import-not-at-top 891 winsound.PlaySound(self.audio_tempfiles[step.audio_file].name, 892 winsound.SND_FILENAME | winsound.SND_NODEFAULT) 893 else: 894 raise app.RuntimeError("Unsupported platform for audio playback") 895 896 if step.command: 897 logging.info("Running command %s", step.command) 898 # Can't use the async command helper function since we need to get at 899 # the device cmd_string_replacements. 900 for device in devices: 901 device.AsyncCommand( 902 shlex.split(step.command % device.cmd_string_replacements), 903 log_output=True) 904 for device in devices: 905 device.WaitForTasks() 906 907 if step.delay_after: 908 logging.info("Waiting for %.2f seconds", step.delay_after) 909 time.sleep(step.delay_after) 910 911 RunAsyncCommand(devices, [ 912 "shell", "log", "-t", "STRESS_TEST", 913 "Iteration %d complete $(head -n 3 " 914 "/proc/timer_list | tail -n 1)" % self.iteration 915 ]) 916 self.iteration += 1 917 918 # TODO(somebody): Sometimes the logcat seems to get stuck and buffers for 919 # a bit. This throws off the event counts, so we should probably add some 920 # synchronization rules before we trigger any events. 921 922 # Go through each device, update the event counter, and see if we need to 923 # trigger any events. 924 devices_to_remove = [] 925 abort_requested = False 926 active_devices = self.GetActiveSerialNumbers() 927 for device in devices: 928 if device.serial_number in active_devices: 929 device.ProcessEvents() 930 else: 931 logging.error("Dropped device %s", device.name) 932 SendNotificationEmail( 933 "Dropped device %s" % device.name, 934 "Device %s is not longer present in the system" % device.name) 935 dropped_devices.append(device) 936 devices_to_remove.append(device) 937 938 # Check to see if any of the dropped devices have come back. If yes, grab 939 # a bug report. 940 for device in dropped_devices: 941 if device.serial_number in active_devices: 942 logging.info("Device %s reappeared", device.name) 943 device.Root() 944 device.TakeBugReport() 945 946 dropped_devices = [d for d in dropped_devices 947 if d.serial_number not in active_devices] 948 949 for device in devices: 950 device.WaitForTasks() 951 if device.remove_device: 952 devices_to_remove.append(device) 953 if device.abort_requested: 954 abort_requested = True 955 956 # Remove devices from our list of things to monitor if they've been marked 957 # for deletion. 958 if devices_to_remove: 959 for device in devices_to_remove: 960 device.Stop() 961 devices = [d for d in devices if d not in devices_to_remove] 962 963 # Print out the iteration summary. 964 if self.iteration % FLAGS.print_summary_every_n == 0: 965 for line in self.GetSummaryLines(): 966 logging.info(line) 967 968 # See if we need to break out of the outer loop. 969 if abort_requested or not devices: 970 break 971 if FLAGS.num_iterations: 972 if self.iteration >= FLAGS.num_iterations: 973 logging.info("Completed full iteration : %d", self.iteration) 974 break 975 SendNotificationEmail( 976 "Stress test %s completed" % (FLAGS.test_name), 977 "\n".join(["Summary:"] + self.GetSummaryLines())) 978 979 def Stop(self): 980 logging.debug("Stopping devices") 981 for device in self.devices: 982 device.Stop() 983 for device in self.devices: 984 device.Join() 985 986 def GetSummaryLines(self): 987 lines = [ 988 _SUMMARY_LINES, 989 "Conducted %d iterations out of %d" % 990 (self.iteration, FLAGS.num_iterations), 991 _SUMMARY_LINES 992 ] 993 for device in self.devices: 994 lines.extend(device.GetSummaryLines()) 995 lines.append(_SUMMARY_LINES) 996 return lines 997 998 999def main(unused_argv): 1000 # Check to make sure that there are no other instances of ADB running - if 1001 # there are, print a warning and wait a bit for them to see it and decide if 1002 # they want to keep running, knowing that logs may be invalid. 1003 try: 1004 if "adb" in subprocess.check_output(["ps", "-ale"]).decode("utf-8"): 1005 print("It looks like there are other instances of adb running. If these " 1006 "other instances are also cating log files, you will not be " 1007 "capturing everything in this stress test (so logs will be " 1008 "invalid).") 1009 print("Continuing in 3...", end=" ") 1010 sys.stdout.flush() 1011 for i in [2, 1, 0]: 1012 time.sleep(1) 1013 if i: 1014 print("%d..." % i, end=" ") 1015 else: 1016 print("") 1017 sys.stdout.flush() 1018 except OSError: 1019 print("Unexpected error:", sys.exc_info()[0]) 1020 if sys.platform.startswith("win"): 1021 pass 1022 else: 1023 raise 1024 1025 # Make the base output directory. 1026 output_root = os.path.join(FLAGS.output_root, "%s_%s" % ( 1027 FLAGS.test_name, datetime.datetime.now().strftime("%Y%m%d_%H%M%S"))) 1028 # output_root = os.path.join(FLAGS.output_root, FLAGS.test_name) 1029 stress_test_common.MakeDirsIfNeeded(output_root) 1030 1031 # Set up logging. 1032 formatter = logging.Formatter( 1033 "%(levelname)-1.1s %(asctime)s [%(threadName)-16.16s] %(message)s") 1034 root_logger = logging.getLogger() 1035 root_logger.setLevel(logging.INFO) 1036 root_logger.setLevel(logging.DEBUG) 1037 1038 file_handler = logging.FileHandler(os.path.join(output_root, 1039 "stress_test.log")) 1040 file_handler.setFormatter(formatter) 1041 root_logger.addHandler(file_handler) 1042 1043 console_handler = logging.StreamHandler() 1044 console_handler.setFormatter(formatter) 1045 root_logger.addHandler(console_handler) 1046 1047 stress_test = StressTest(output_root, FLAGS.test_name) 1048 try: 1049 stress_test.Start() 1050 finally: 1051 logging.info("Stopping device logging threads") 1052 stress_test.Stop() 1053 for line in stress_test.GetSummaryLines(): 1054 logging.info(line) 1055 if FLAGS.delete_data_dir: 1056 print("Deleting Data Dir") 1057 subprocess.check_output(["rm", "-r", "-f", output_root]) 1058 1059 1060if __name__ == "__main__": 1061 app.run(main) 1062