1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Stress test utility for repeating actions repeatedly on android devices.
16
17Configures multiple devices to simultaneously run through the same set of
18actions over and over, while keeping logs from various sources. Primarily
19designed for playing audio to the devices and scanning their log output for
20events, while running other adb commands in between.
21"""
22from __future__ import absolute_import
23from __future__ import division
24from __future__ import print_function
25
26import datetime
27from email import encoders
28from email.mime import text
29import email.mime.base as base
30import email.mime.multipart as multipart
31import logging
32import mimetypes
33import os
34import platform
35import re
36import shlex
37import signal
38import smtplib
39import socket
40import subprocess
41import sys
42import tempfile
43import threading
44import time
45import uuid
46import wave
47from absl import app
48from absl import flags
49import pexpect
50import queue
51import stress_test_common
52import stress_test_pb2
53from google.protobuf import text_format
54
55_SUMMARY_LINES = "-" * 73
56
57if sys.platform.startswith("win"):
58  pexpect = None
59
60_SUMMARY_COLUMNS = (
61    "|        Event Type       |      Event Count     | Consecutive no event |")
62_SUMMARY_COL_FORMATT = "|%-25.25s|% 22d|% 22d|"
63
64FLAGS = flags.FLAGS
65flags.DEFINE_string("notification_address", "",
66                    "Email address where to send notification events. Will "
67                    "default to $USER@google.com if not provided. No emails "
68                    "will be sent if suppress_notification_emails is True.")
69flags.DEFINE_bool("suppress_notification_emails", False,
70                  "Prevents emails from being sent as notifications if True.")
71flags.DEFINE_string("test_name", None,
72                    "Name of stress test to run. For example, if you set this "
73                    "to 'dsp_trigger_sw_rejection', the stress test in "
74                    "'stress_test.dsp_trigger_sw_rejection.ascii_proto' will "
75                    "be loaded and executed.")
76# flags.mark_flag_as_required("test_name")
77flags.DEFINE_string("output_root", "./",
78                    "Path where directory should be generated containing all "
79                    "logs from devices and moved files.")
80flags.DEFINE_integer("num_iterations", None,
81                     "If set to a positive number, the number of iterations of "
82                     "the stress test to run. Otherwise, the test runs "
83                     "forever.")
84flags.DEFINE_list("devices", [],
85                  "Serial numbers of devices that should be included in the "
86                  "stress test. If empty, all devices will be used.")
87flags.DEFINE_integer("print_summary_every_n", 10,
88                     "Prints the summary to the log file every n iterations.")
89
90flags.DEFINE_string("email_sender_address", "",
91                    "Account to use for sending notification emails.")
92flags.DEFINE_string("email_sender_password", "",
93                    "Password to use for notification email account.")
94flags.DEFINE_string("email_smtp_server", "smtp.gmail.com",
95                    "SMTP server to use for sending notification emails.")
96flags.DEFINE_integer("email_smtp_port", 465,
97                     "Port to use for the notification SMTP server.")
98flags.DEFINE_integer("device_settle_time", 5,
99                     "Time to wait for devices to settle.")
100flags.DEFINE_bool("use_sox", platform.system() != "Windows",
101                  "Use sox for playback, otherwise, attempt to use platform "
102                  "specific features.")
103flags.DEFINE_bool("attach_bugreport", True,
104                  "Attach bugreport to email if test failed.")
105flags.DEFINE_bool("delete_data_dir", False,
106                  "If true, code will delete all the files generated by this "
107                  "test at the end.")
108
109if platform.system().startswith("CYGWIN"):
110  FLAGS.device_settle_time = 30
111
112
113def QueueWorker(worker_queue):
114  while True:
115    work = worker_queue.get()
116    try:
117      work()
118    except:  # pylint:disable=bare-except
119      logging.exception("Exception in worker queue - task remains uncompleted.")
120    worker_queue.task_done()
121
122
123def SendNotificationEmail(subject, body, bugreport=None):
124  """Sends an email with the specified subject and body.
125
126     Also attach bugreport if bugreport location is provided as argument
127
128  Args:
129    subject: Subject of the email.
130    body: Body of the email.
131    bugreport: If provided, it will be attach to the email.
132  """
133  if FLAGS.suppress_notification_emails:
134    logging.info("Email with subject '%s' has been suppressed", subject)
135    return
136  try:
137    # Assemble the message to send.
138    recpient_address = FLAGS.notification_address
139    message = multipart.MIMEMultipart("alternative")
140    message["From"] = "Stress Test on %s" % socket.gethostname()
141    message["To"] = recpient_address
142    message["Subject"] = subject
143    message.attach(text.MIMEText(body, "plain"))
144    message.attach(text.MIMEText("<pre>%s</pre>" % body, "html"))
145
146    if FLAGS.attach_bugreport and bugreport:
147      # buildozer: disable=unused-variable
148      ctype, _ = mimetypes.guess_type(bugreport)
149      maintype, subtype = ctype.split("/", 1)
150      with open(bugreport, "rb") as fp:
151        att = base.MIMEBase(maintype, subtype)
152        att.set_payload(fp.read())
153        encoders.encode_base64(att)
154        att.add_header("Content-Disposition", "attachment", filename=bugreport)
155        message.attach(att)
156
157    # Send the message from our special account.
158    server = smtplib.SMTP_SSL(FLAGS.email_smtp_server, FLAGS.email_smtp_port)
159    server.login(FLAGS.email_sender_address, FLAGS.email_sender_password)
160    server.sendmail(FLAGS.email_sender_address, recpient_address,
161                    message.as_string())
162    server.quit()
163    logging.info("Email with subject '%s' has been sent", subject)
164  except:  # pylint:disable=bare-except
165    logging.exception("Failed to send notification email")
166
167
168class ProcessLogger(threading.Thread):
169
170  class EventScanner(object):
171
172    def __init__(self, name, process_name, regexes):
173      """Struct to store the data about an event.
174
175      Args:
176        name: Name of event.
177        process_name: Name of the process that is being logged.
178        regexes: An iteratable of regex strings that indicate an event has
179            happened.
180      """
181
182      self.name = name
183      self.process_name = process_name
184      self.searches = [re.compile(regex).search for regex in regexes]
185      self.count = 0
186
187    def ScanForEvent(self, line, lock=None):
188      """Checks the line for matches. If found, updates the internal counter."""
189
190      for search in self.searches:
191        if search(line.decode("utf-8")):
192          # Grab the lock (if provided), update the counter, and release it.
193          if lock: lock.acquire()
194          self.count += 1
195          if lock: lock.release()
196          logging.info("Event '%s' detected on %s", self.name,
197                       self.process_name)
198
199  def __init__(self, name, command, output, events,
200               restart_process, repeats_output_when_opened):
201    """Threaded class that monitors processes for events, and logs output.
202
203    Args:
204      name: The name of the process being logged.
205      command: A list of arguments to be passed to the subprocess to execute.
206      output: Name of output file to write process stdout to. If blank or None,
207          will not be generated.
208      events: An iterable of LoggingEventConfigs to look for in the output.
209      restart_process: Restart the process if it terminates by itself. This
210          should typically be true, but false for processes that only should be
211          run once and have their output logged.
212      repeats_output_when_opened: Set to true if the process will repeat the
213          output of a previous call when it is restarted. This will prevent
214          duplicate lines from being logged.
215    """
216    super(ProcessLogger, self).__init__()
217    self.name = name
218    self.command = command
219    self.restart_process = restart_process
220    self.repeats_output_when_opened = repeats_output_when_opened
221    self.process = None
222    self.lock = threading.Lock()
223    self.looking = False
224
225    # Compile the list of regexes that we're supposed to be looking for.
226    self.events = []
227    for event in events:
228      self.events.append(ProcessLogger.EventScanner(event.name, self.name,
229                                                    event.regex))
230
231    if output:
232      stress_test_common.MakeDirsIfNeeded(os.path.dirname(output))
233      self.output_fp = open(output, "w", encoding="utf-8")
234      logging.info("Logging device info to %s", output)
235    else:
236      self.output_fp = None
237
238  def GetEventCountsSinceLastCall(self):
239    """Returns the counts of all events since this method was last called."""
240    event_map = {}
241    self.lock.acquire()
242    for event in self.events:
243      event_map[event.name] = event.count
244      event.count = 0
245    self.lock.release()
246    return event_map
247
248  def run(self):
249    last_line = None
250    should_log = True
251    first_run = True
252    skip_exception_line = False
253    self.lock.acquire()
254    last_run_time = 0
255    while self.restart_process:
256      self.lock.release()
257      if not first_run:
258        logging.info("Restarting process %s", "".join(str(self.command)))
259        time_since_last_run = datetime.datetime.now() - last_run_time
260        if time_since_last_run.total_seconds() < 1.0:
261          needed_delay = 1.0 - time_since_last_run.total_seconds()
262          logging.info("Delaying for %.2f seconds", needed_delay)
263          time.sleep(needed_delay)
264      else:
265        first_run = False
266
267      try:
268        if pexpect:
269          self.process = pexpect.spawn(" ".join(self.command), timeout=None)
270          output_source = self.process
271        else:
272          self.process = subprocess.Popen(self.command, stdout=subprocess.PIPE)
273          output_source = self.process.stdout
274        last_run_time = datetime.datetime.now()
275        for line in output_source:
276          # If the process we're logging likes to repeat its output, we need to
277          # look for the last line we saw before we start doing anything with
278          # these lines anymore.
279          if self.repeats_output_when_opened:
280            if not should_log:
281              if last_line == line:
282                should_log = True
283              continue
284            elif skip_exception_line:
285              # ignore the last line which caused UnicodeEncodeError
286              skip_exception_line = False
287              continue
288
289          if self.output_fp:
290            self.output_fp.write(line.decode("utf-8", "backslashreplace").rstrip())
291            self.output_fp.write("\n")
292
293          # Loop through all events we're watching for, to see if they occur on
294          # this line. If they do, update the fact that we've seen this event.
295          for event in self.events:
296            if self.looking:
297              event.ScanForEvent(line, lock=self.lock)
298          last_line = line
299      except UnicodeEncodeError:
300        logging.exception("UnicodeEncodeError on running logger process")
301        skip_exception_line = True
302      except:  # pylint:disable=bare-except
303        logging.exception("Exception encountered running process")
304      finally:
305        if pexpect:
306          self.process.terminate()
307        else:
308          self.process.send_signal(signal.SIGTERM)
309        should_log = False
310      self.lock.acquire()
311
312    self.lock.release()
313    if pexpect:
314      if self.process.exitstatus is not None:
315        logging.info("Process finished - exit code %d", self.process.exitstatus)
316      else:
317        logging.info("Process finished - signal code %d",
318                     self.process.signalstatus)
319    else:
320      if self.process.returncode is not None:
321        logging.info("Process finished - return code %d",
322                     self.process.returncode)
323      else:
324        logging.info("Process finished - no return code")
325
326  def StopLogging(self):
327    if self.process:
328      self.lock.acquire()
329      self.restart_process = False
330      self.lock.release()
331
332      if pexpect:
333        self.process.kill(signal.SIGHUP)
334        self.process.kill(signal.SIGINT)
335      else:
336        self.process.send_signal(signal.SIGTERM)
337
338
339class Device(object):
340
341  SECONDS_TO_SLEEP_DURING_ROOT = 0.5
342
343  def __init__(self, serial_number, output_root, test_events, expected_result):
344    """Responsible for monitoring a specific device, and pulling files from it.
345
346    The actual work of the constructor will be handled asynchronously, you must
347    call WaitForTasks() before using the device.
348
349    Args:
350      serial_number: The device serial number.
351      output_root: The directory where to output log files/anything pulled from
352          the device.
353      test_events: The events (with conditions) that come from the StressTest
354          that should be evaluated at every iteration, along with a list of
355          actions to take when one of these events occur. For example, if there
356          have not been any detected hotword triggers, a bugreport can be
357          generated.
358      expected_result: Expected event count to pass the test.
359    """
360    self.serial_number = serial_number
361    self.output_root = output_root
362    self.cmd_string_replacements = {}
363    self.iteration = 0
364    self.cmd_string_replacements["iteration"] = 0
365    self.cmd_string_replacements["serial_number"] = serial_number
366    self.cmd_string_replacements["output_root"] = output_root
367    self.name = None
368    self.process_loggers = []
369    self.event_log = stress_test_pb2.EventLog()
370    self.cnt_per_iteration = expected_result
371
372    # Prepare the work queue, and offload the rest of the init into it.
373    self.work_queue = queue.Queue()
374    self.worker = threading.Thread(target=QueueWorker, args=[self.work_queue])
375    self.worker.daemon = True
376    self.worker.name = self.name
377    self.worker.start()
378    self.abort_requested = False
379    self.remove_device = False
380    self.test_events = test_events
381
382    self.work_queue.put(self.__init_async__)
383
384  def __init_async__(self):
385    # Get the device type, and append it to the serial number.
386    self.device_type = self.Command(["shell", "getprop",
387                                     "ro.product.name"]).strip().decode("utf-8")
388    self.name = "%s_%s" % (self.device_type, self.serial_number)
389    self.worker.name = self.name
390    self.cmd_string_replacements["device"] = self.name
391    logging.info("Setting up device %s", self.name)
392
393    config = stress_test_common.LoadDeviceConfig(self.device_type,
394                                                 self.serial_number)
395
396    # Get the device ready.
397    self.Root()
398
399    # Run any setup commands.
400    for cmd in config.setup_command:
401      result = self.Command(
402          shlex.split(cmd % self.cmd_string_replacements)).strip()
403      if result:
404        for line in result.splitlines():
405          logging.info(line)
406
407    self.files_to_move = config.file_to_move
408
409    self.event_names = set([event.name for event in config.event])
410    self.event_counter = {name: 0 for name in self.event_names}
411    self.iterations_since_event = {name: 0 for name in self.event_names}
412
413    for file_to_watch in config.file_to_watch:
414      # Are there any events that match up with this file?
415      events = [x for x in config.event if x.source == file_to_watch.source]
416
417      if file_to_watch.source == "LOGCAT":
418        command = [
419            "adb", "-s", self.serial_number, "logcat", "-v", "usec", ""
420        ]
421        command.extend(["%s:S" % tag for tag in config.tag_to_suppress])
422        name = "logcat_" + self.serial_number
423      else:
424        command = [
425            "adb", "-s", self.serial_number, "shell",
426            "while : ; do cat %s 2>&1; done" % file_to_watch.source
427        ]
428        name = "%s_%s" % (os.path.basename(
429            file_to_watch.source), self.serial_number)
430
431      process_logger = ProcessLogger(
432          name, command, os.path.join(
433              self.output_root,
434              file_to_watch.destination % self.cmd_string_replacements),
435          events, True, file_to_watch.repeats_output_on_open)
436      self.process_loggers.append(process_logger)
437      process_logger.start()
438
439    # Add any of the background processes.
440    for daemon_process in config.daemon_process:
441      # Are there any events that match up with this file?
442      events = [x for x in config.event if x.source == daemon_process.name]
443      command = shlex.split(
444          daemon_process.command % self.cmd_string_replacements)
445      if daemon_process.destination:
446        output = os.path.join(
447            self.output_root,
448            daemon_process.destination % self.cmd_string_replacements)
449      else:
450        output = None
451      name = "%s_%s" % (daemon_process.name, self.serial_number)
452      process_logger = ProcessLogger(name, command, output, events,
453                                     daemon_process.restart,
454                                     daemon_process.repeats_output_on_open)
455      self.process_loggers.append(process_logger)
456      process_logger.start()
457
458    # Build up the list of events we can actually process.
459    self.__UpdateEventCounters(number_of_iterations=0)
460    test_events = self.test_events
461    self.test_events = []
462    for event in test_events:
463      try:
464        eval(event.condition,  # pylint:disable=eval-used
465             {"__builtins__": None}, self.__ValuesInEval())
466        self.test_events.append(event)
467      except Exception as err:  # pylint:disable=broad-except
468        logging.error("Test event %s is not compatible with %s", event.name,
469                      self.name)
470        logging.error(str(err))
471    # Make sure that device specific events don't have conditions.
472    self.device_events = []
473    for event in config.test_event:
474      if not event.name:
475        logging.error("Device %s test event is missing a name", self.name)
476        continue
477      if event.condition:
478        self.test_events.append(event)
479      else:
480        self.device_events.append(event)
481
482  def StartLookingForEvents(self):
483    """Starts all child ProcessLoggers to start looking for events."""
484    for process_logger in self.process_loggers:
485      process_logger.looking = True
486
487  def __ValuesInEval(self):
488    values_in_eval = {key: value for key, value
489                      in list(self.event_counter.items())}
490    for key, value in list(self.iterations_since_event.items()):
491      values_in_eval["iterations_since_%s" % key] = value
492    return values_in_eval
493
494  def __GetExpectedEventCount(self, event):
495    if event == "logcat_iteration":
496      return -1
497    try:
498      event_cnt = getattr(self.cnt_per_iteration, event)
499    except AttributeError:
500      event_cnt = -1
501      logging.exception("%s is not an attribute of expected_result", event)
502    return event_cnt
503
504  def __UpdateEventCounters(self, number_of_iterations=1):
505    # Update the event counters
506    visited_events = set()
507    error_log = []
508    for process_logger in self.process_loggers:
509      events = process_logger.GetEventCountsSinceLastCall()
510      for event, count in list(events.items()):
511        # Print log when there is any missed event
512        expected_count = self.__GetExpectedEventCount(event)
513
514        if expected_count > 0:
515          if count > expected_count * number_of_iterations:
516            logging.info(
517                "[STRESS_TEST] In iteration %d, got duplicated %s : %d",
518                self.iteration, event, count)
519            logging.info("[STRESS_TEST] Will count only : %d",
520                         expected_count * number_of_iterations)
521            count = expected_count * number_of_iterations
522
523        if count:
524          self.event_counter[event] += count
525          visited_events.add(event)
526
527        if expected_count >= 0:
528          if expected_count * number_of_iterations != count:
529            error_log.append(
530                _SUMMARY_COL_FORMATT %
531                (event, count, expected_count * number_of_iterations))
532
533    # Go clear all the events that weren't consecutive.
534    for event in self.iterations_since_event:
535      if event in visited_events:
536        self.iterations_since_event[event] = 0
537      else:
538        self.iterations_since_event[event] += number_of_iterations
539
540    if error_log:
541      logging.info(_SUMMARY_LINES)
542      logging.info(" iteration %d : Something wrong in %s.",
543                   self.iteration, self.name)
544      logging.info(_SUMMARY_LINES)
545      logging.info(_SUMMARY_COLUMNS)
546      logging.info(_SUMMARY_LINES)
547      for line in error_log:
548        logging.info(line)
549      logging.info(_SUMMARY_LINES)
550
551  def ProcessEvents(self):
552    """Updates the event_counter and iterations_since_event maps."""
553    self.work_queue.put(self.__ProcessEventsAsync)
554
555  def __ProcessEventsAsync(self):
556    # Move any files to the local machine that should be moved.
557    if self.files_to_move:
558      for file_to_move in self.files_to_move:
559        try:
560          self.Command(["pull", file_to_move.source, file_to_move.destination])
561        except:  # pylint:disable=bare-except
562          logging.exception("Failed to pull %s", file_to_move.source)
563
564    self.__UpdateEventCounters()
565
566    for event in self.test_events:
567      if eval(event.condition,  # pylint:disable=eval-used
568              {"__builtins__": None}, self.__ValuesInEval()):
569        logging.info("Condition has been met for event '%s'", event.name)
570        # Write the updated event log.
571        event_log_details = self.event_log.event.add()
572        event_log_details.iteration = self.iteration
573        event_log_details.name = event.name
574        with open(os.path.join(self.output_root,
575                               "%s_event_log.ascii_proto" % self.name),
576                  "w") as fp:
577          text_format.PrintMessage(self.event_log, fp)
578
579        # Do whatever other actions that are part of the event.
580        self.__ProcessEventActionQueue(event)
581
582        # Run any device specific actions for this event.
583        for device_event in self.device_events:
584          if device_event.name == event.name:
585            self.__ProcessEventActionQueue(device_event)
586
587    # Set up the next iteration.
588    self.iteration += 1
589    self.cmd_string_replacements["iteration"] = self.iteration
590
591  def __ProcessEventActionQueue(self, event):
592    bugreport = None
593    for action in event.action:
594      if action == "BUGREPORT":
595        bugreport = self.TakeBugReport()
596      elif action.startswith("DUMPSYS "):
597        self.CaptureDumpsys(action[action.find(" ") + 1:])
598      elif action == "NOTIFY":
599        SendNotificationEmail(
600            "%s had event '%s' occur" % (self.name, event.name),
601            "\n".join(["Current Summary:"] + self.GetSummaryLines()), bugreport)
602      elif action == "REMOVE_DEVICE":
603        logging.info("Removing %s from the test", self.serial_number)
604        self.remove_device = True
605      elif action == "ABORT":
606        logging.info("Abort requested")
607        self.abort_requested = True
608      else:
609        action %= self.cmd_string_replacements
610        logging.info("Running command %s on %s", action, self.name)
611        result = self.Command(shlex.split(action)).strip()
612        if result:
613          for line in result.splitlines():
614            logging.info(line)
615
616  def Root(self):
617    self.Command(["root"])
618    time.sleep(Device.SECONDS_TO_SLEEP_DURING_ROOT)
619    self.Command(["wait-for-device"])
620    time.sleep(Device.SECONDS_TO_SLEEP_DURING_ROOT)
621
622  def Stop(self):
623    """Stops all file loggers attached to this device."""
624    for process_logger in self.process_loggers:
625      process_logger.StopLogging()
626    self.process_loggers = []
627
628  def Join(self):
629    for process_logger in self.process_loggers:
630      process_logger.join()
631    self.WaitForTasks()
632
633  def AsyncCommand(self, command, log_output=False):
634    self.work_queue.put(
635        lambda: self.__AsyncCommand(command, log_output=log_output))
636
637  def __AsyncCommand(self, command, log_output=False):
638    result = self.Command(command).strip()
639    if result and log_output:
640      # log both logcat and stress testing log
641      # some test will depend on adb command output (ex: dumpsys)
642      self.Command(['shell', 'log', '-t', 'STRESS_TEST', result])
643      for line in result.splitlines():
644        logging.info(line.decode("utf-8"))
645
646  def Command(self, command):
647    """Runs the provided command on this device."""
648    if command[0] in {"bugreport", "root", "wait-for-device", "shell",
649                      "logcat"}:
650      return subprocess.check_output(
651          ["adb", "-s", self.serial_number] + command)
652    elif command[0] == "DUMPSYS":
653      self.CaptureDumpsys(command[1])
654      return ""
655    elif command[0] == "pull":
656      try:
657        files = subprocess.check_output(
658            ["adb", "-s", self.serial_number, "shell", "ls", command[1]]
659        ).strip().splitlines()
660      except subprocess.CalledProcessError:
661        return ""
662      if len(files) == 1 and "No such file or directory" in files[0]:
663        return ""
664      for source_file in files:
665        destination = os.path.join(self.output_root,
666                                   command[2] % self.cmd_string_replacements)
667        stress_test_common.MakeDirsIfNeeded(os.path.dirname(destination))
668        logging.info("Moving %s from %s to %s", source_file, self.name,
669                     destination)
670        subprocess.check_output(["adb", "-s", self.serial_number, "pull",
671                                 source_file, destination])
672        if FLAGS.delete_data_dir:
673          subprocess.check_output([
674              "adb", "-s", self.serial_number, "shell", "rm", "-rf", source_file
675          ])
676        return ""
677    else:
678      return subprocess.check_output(command)
679
680  def TakeBugReport(self):
681    logging.info("Capturing bugreport on %s", self.name)
682    bugreport = os.path.join(self.output_root,
683                             "%s_bugreport_iteration_%06d.zip" %
684                             (self.name, self.iteration))
685    sdk = int(self.Command(
686        ["shell", "getprop", "ro.build.version.sdk"]).strip())
687    if sdk >= 24:  # SDK 24 = Android N
688      with open(bugreport, "wb") as bugreport_fp:
689        bugreport_fp.write(self.Command(["bugreport", bugreport]))
690    else:
691      bugreport_txt = os.path.join(self.output_root,
692                                   "%s_bugreport_iteration_%06d.txt" %
693                                   (self.name, self.iteration))
694      with open(bugreport_txt, "wb") as bugreport_fp:
695        bugreport_fp.write(self.Command(["bugreport"]))
696      self.Command(["zip", bugreport, bugreport_txt])
697
698    self.Command(["pull", "/data/anr/traces.txt",
699                  "%s_traces_iteration_%06d.txt" % (self.name, self.iteration)])
700    self.Command(["pull", "/data/anr/traces.txt.bugreport",
701                  "%s_traces_iteration_%06d.txt.bugreport" % (self.name,
702                                                              self.iteration)])
703    return bugreport
704
705  def CaptureDumpsys(self, dumpsys_unit):
706    logging.info("Taking dumpsys %s on %s", dumpsys_unit, self.name)
707    stress_test_common.MakeDirsIfNeeded(os.path.join(self.output_root,
708                                                     self.name))
709    with open(os.path.join(self.output_root, self.name,
710                           "%s_%06d.txt" % (dumpsys_unit, self.iteration)),
711              "w") as dumpsys_fp:
712      dumpsys_fp.write(self.Command(["shell", "dumpsys", dumpsys_unit]))
713
714  def WaitForTasks(self):
715    self.work_queue.join()
716
717  def GetSummaryLines(self):
718    lines = [
719        "Device {}".format(self.name),
720        _SUMMARY_LINES, _SUMMARY_COLUMNS, _SUMMARY_LINES
721    ]
722    for event, count in sorted(self.event_counter.items()):
723      lines.append(_SUMMARY_COL_FORMATT % (
724          event, count, self.iterations_since_event[event]))
725    lines.append(_SUMMARY_LINES)
726    return lines
727
728
729def RunAsyncCommand(devices, command):
730  """Helper function for running async commands on many devices."""
731  for device in devices:
732    device.AsyncCommand(command)
733  for device in devices:
734    device.WaitForTasks()
735
736
737class StressTest(object):
738  """Manages dispatching commands to devices/playing audio and events."""
739
740  def __init__(self, output_root, test_name):
741    self.output_root = output_root
742    self.devices = []
743    self.test_name = test_name
744    config = stress_test_pb2.StressTestConfig()
745    config_contents = stress_test_common.GetResourceContents(
746        os.path.join(stress_test_common.RESOURCE_DIR,
747                     "stress_test.%s.ascii_proto" % test_name))
748    text_format.Merge(config_contents, config)
749    self.events = config.event
750    self.setup_commands = config.setup_command
751    self.steps = config.step
752    self.audio_tempfiles = {}
753    self.uuid = str(uuid.uuid4())
754    self.expected_result = None
755    self.iteration = 0
756    if config.expected_result:
757      self.expected_result = config.expected_result[0]
758
759    # Place all the audio files into temp files.
760    for step in self.steps:
761      if step.audio_file and step.audio_file not in self.audio_tempfiles:
762        # We can't delete the temp file on windows, since it gets nuked too
763        # early.
764        audio_tempfile = tempfile.NamedTemporaryFile(
765            delete=(platform.system() != "Windows"),
766            dir="." if platform.system().startswith("CYGWIN") else None
767        )
768        if platform.system().startswith("CYGWIN"):
769          audio_tempfile.name = os.path.basename(audio_tempfile.name)
770        self.audio_tempfiles[step.audio_file] = audio_tempfile
771        if FLAGS.use_sox:
772          # Write out the raw PCM samples as a wave file.
773          audio_tempfile.write(
774              stress_test_common.GetResourceContents(step.audio_file))
775        else:
776          # Make a temporary wave file for playout if we can't use sox.
777          wavefile = wave.open(audio_tempfile, "wb")
778          if step.audio_file_sample_rate <= 0:
779            step.audio_file_sample_rate = 16000
780          wavefile.setframerate(step.audio_file_sample_rate)
781          if step.audio_file_num_channels <= 0:
782            step.audio_file_num_channels = 1
783          wavefile.setnchannels(step.audio_file_num_channels)
784          if not step.audio_file_format:
785            wavefile.setsampwidth(2)
786          elif step.audio_file_format == "s8":
787            wavefile.setsampwidth(1)
788          elif step.audio_file_format == "s16":
789            wavefile.setsampwidth(2)
790          elif step.audio_file_format == "s32":
791            wavefile.setsampwidth(4)
792          else:
793            raise RuntimeError(
794                "Unsupported wave file format for %s" % step.audio_file)
795          wavefile.writeframes(stress_test_common.GetResourceContents(
796              step.audio_file))
797          wavefile.close()
798        audio_tempfile.flush()
799
800        if platform.system() == "Windows":
801          audio_tempfile.close()
802
803    # Create all the devices that are attached to this machine.
804    for serial_number in self.GetActiveSerialNumbers():
805      self.devices.append(
806          Device(serial_number, output_root, self.events, self.expected_result))
807    if not self.devices:
808      raise app.UsageError("No devices connected")
809
810    self.devices.sort(key=lambda x: x.name)
811
812    # Make sure every device is done with their work for setup.
813    for device in self.devices:
814      device.WaitForTasks()
815
816    # Write out the info meta-data proto. Useful for doing analysis of the logs
817    # after the stress test has completed.
818    stress_test_info = stress_test_pb2.StressTestInfo()
819    stress_test_info.test_name = self.test_name
820    stress_test_info.test_description = config.description
821    stress_test_info.uuid = self.uuid
822    for device in self.devices:
823      device_pb = stress_test_info.device.add()
824      device_pb.device_type = device.device_type
825      device_pb.serial_number = device.serial_number
826
827    text_format.PrintMessage(stress_test_info, open(os.path.join(
828        self.output_root, "stress_test_info.ascii_proto"), "w"))
829
830  def GetActiveSerialNumbers(self):
831    serial_numbers = []
832    for line in sorted(
833        subprocess.check_output(["adb", "devices"]).splitlines()):
834      if line.endswith(b"device"):
835        serial_number = line.split()[0].strip()
836        if FLAGS.devices and serial_number not in FLAGS.devices:
837          continue
838        serial_numbers.append(serial_number.decode("utf-8"))
839    return serial_numbers
840
841  def Start(self):
842    logging.info("Waiting for devices to settle")
843    time.sleep(5)
844    # Make a copy of the device list, as we'll be modifying this actual list.
845    devices = list(self.devices)
846    dropped_devices = []
847
848    # If we have any setup commands, run them.
849    for command in self.setup_commands:
850      logging.info("Running command %s", command)
851      # Can't use the async command helper function since we need to get at
852      # the device cmd_string_replacements.
853      for device in devices:
854        device.AsyncCommand(
855            shlex.split(command % device.cmd_string_replacements),
856            log_output=True)
857      for device in devices:
858        device.WaitForTasks()
859
860    for device in devices:
861      device.StartLookingForEvents()
862      device.AsyncCommand(["shell", "log", "-t", "STRESS_TEST",
863                           "Starting {%s} TZ=$(getprop persist.sys.timezone) "
864                           "YEAR=$(date +%%Y)" % self.uuid], True)
865    self.iteration = 0
866    while True:
867      logging.info("Starting iteration %d", self.iteration)
868      # Perform all the actions specified in the test.
869      RunAsyncCommand(devices, [
870          "shell", "log", "-t", "STRESS_TEST",
871          "Performing iteration %d $(head -n 3 "
872          "/proc/timer_list | tail -n 1)" % self.iteration
873      ])
874
875      for step in self.steps:
876        if step.delay_before:
877          logging.info("Waiting for %.2f seconds", step.delay_before)
878          time.sleep(step.delay_before)
879
880        if step.audio_file:
881          logging.info("Playing %s", step.audio_file)
882          RunAsyncCommand(devices, ["shell", "log", "-t", "STRESS_TEST",
883                                    "Playing %s" % step.audio_file])
884
885          if FLAGS.use_sox:
886            subprocess.check_call(["sox", "-q",
887                                   self.audio_tempfiles[step.audio_file].name,
888                                   "-d"])
889          elif platform.system() == "Windows":
890            import winsound  # pylint:disable=g-import-not-at-top
891            winsound.PlaySound(self.audio_tempfiles[step.audio_file].name,
892                               winsound.SND_FILENAME | winsound.SND_NODEFAULT)
893          else:
894            raise app.RuntimeError("Unsupported platform for audio playback")
895
896        if step.command:
897          logging.info("Running command %s", step.command)
898          # Can't use the async command helper function since we need to get at
899          # the device cmd_string_replacements.
900          for device in devices:
901            device.AsyncCommand(
902                shlex.split(step.command % device.cmd_string_replacements),
903                log_output=True)
904          for device in devices:
905            device.WaitForTasks()
906
907        if step.delay_after:
908          logging.info("Waiting for %.2f seconds", step.delay_after)
909          time.sleep(step.delay_after)
910
911      RunAsyncCommand(devices, [
912          "shell", "log", "-t", "STRESS_TEST",
913          "Iteration %d complete $(head -n 3 "
914          "/proc/timer_list | tail -n 1)" % self.iteration
915      ])
916      self.iteration += 1
917
918      # TODO(somebody): Sometimes the logcat seems to get stuck and buffers for
919      # a bit. This throws off the event counts, so we should probably add some
920      # synchronization rules before we trigger any events.
921
922      # Go through each device, update the event counter, and see if we need to
923      # trigger any events.
924      devices_to_remove = []
925      abort_requested = False
926      active_devices = self.GetActiveSerialNumbers()
927      for device in devices:
928        if device.serial_number in active_devices:
929          device.ProcessEvents()
930        else:
931          logging.error("Dropped device %s", device.name)
932          SendNotificationEmail(
933              "Dropped device %s" % device.name,
934              "Device %s is not longer present in the system" % device.name)
935          dropped_devices.append(device)
936          devices_to_remove.append(device)
937
938      # Check to see if any of the dropped devices have come back. If yes, grab
939      # a bug report.
940      for device in dropped_devices:
941        if device.serial_number in active_devices:
942          logging.info("Device %s reappeared", device.name)
943          device.Root()
944          device.TakeBugReport()
945
946      dropped_devices = [d for d in dropped_devices
947                         if d.serial_number not in active_devices]
948
949      for device in devices:
950        device.WaitForTasks()
951        if device.remove_device:
952          devices_to_remove.append(device)
953        if device.abort_requested:
954          abort_requested = True
955
956      # Remove devices from our list of things to monitor if they've been marked
957      # for deletion.
958      if devices_to_remove:
959        for device in devices_to_remove:
960          device.Stop()
961        devices = [d for d in devices if d not in devices_to_remove]
962
963      # Print out the iteration summary.
964      if self.iteration % FLAGS.print_summary_every_n == 0:
965        for line in self.GetSummaryLines():
966          logging.info(line)
967
968      # See if we need to break out of the outer loop.
969      if abort_requested or not devices:
970        break
971      if FLAGS.num_iterations:
972        if self.iteration >= FLAGS.num_iterations:
973          logging.info("Completed full iteration : %d", self.iteration)
974          break
975    SendNotificationEmail(
976        "Stress test %s completed" % (FLAGS.test_name),
977        "\n".join(["Summary:"] + self.GetSummaryLines()))
978
979  def Stop(self):
980    logging.debug("Stopping devices")
981    for device in self.devices:
982      device.Stop()
983    for device in self.devices:
984      device.Join()
985
986  def GetSummaryLines(self):
987    lines = [
988        _SUMMARY_LINES,
989        "Conducted %d iterations out of %d" %
990        (self.iteration, FLAGS.num_iterations),
991        _SUMMARY_LINES
992    ]
993    for device in self.devices:
994      lines.extend(device.GetSummaryLines())
995    lines.append(_SUMMARY_LINES)
996    return lines
997
998
999def main(unused_argv):
1000  # Check to make sure that there are no other instances of ADB running - if
1001  # there are, print a warning and wait a bit for them to see it and decide if
1002  # they want to keep running, knowing that logs may be invalid.
1003  try:
1004    if "adb" in subprocess.check_output(["ps", "-ale"]).decode("utf-8"):
1005      print("It looks like there are other instances of adb running. If these "
1006            "other instances are also cating log files, you will not be "
1007            "capturing everything in this stress test (so logs will be "
1008            "invalid).")
1009      print("Continuing in 3...", end=" ")
1010      sys.stdout.flush()
1011      for i in [2, 1, 0]:
1012        time.sleep(1)
1013        if i:
1014          print("%d..." % i, end=" ")
1015        else:
1016          print("")
1017        sys.stdout.flush()
1018  except OSError:
1019    print("Unexpected error:", sys.exc_info()[0])
1020    if sys.platform.startswith("win"):
1021      pass
1022    else:
1023      raise
1024
1025  # Make the base output directory.
1026  output_root = os.path.join(FLAGS.output_root, "%s_%s" % (
1027      FLAGS.test_name, datetime.datetime.now().strftime("%Y%m%d_%H%M%S")))
1028  # output_root = os.path.join(FLAGS.output_root, FLAGS.test_name)
1029  stress_test_common.MakeDirsIfNeeded(output_root)
1030
1031  # Set up logging.
1032  formatter = logging.Formatter(
1033      "%(levelname)-1.1s %(asctime)s [%(threadName)-16.16s] %(message)s")
1034  root_logger = logging.getLogger()
1035  root_logger.setLevel(logging.INFO)
1036  root_logger.setLevel(logging.DEBUG)
1037
1038  file_handler = logging.FileHandler(os.path.join(output_root,
1039                                                  "stress_test.log"))
1040  file_handler.setFormatter(formatter)
1041  root_logger.addHandler(file_handler)
1042
1043  console_handler = logging.StreamHandler()
1044  console_handler.setFormatter(formatter)
1045  root_logger.addHandler(console_handler)
1046
1047  stress_test = StressTest(output_root, FLAGS.test_name)
1048  try:
1049    stress_test.Start()
1050  finally:
1051    logging.info("Stopping device logging threads")
1052    stress_test.Stop()
1053    for line in stress_test.GetSummaryLines():
1054      logging.info(line)
1055    if FLAGS.delete_data_dir:
1056      print("Deleting Data Dir")
1057      subprocess.check_output(["rm", "-r", "-f", output_root])
1058
1059
1060if __name__ == "__main__":
1061  app.run(main)
1062