1# Copyright (C) 2022 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import csv
15import enum
16import functools
17import logging
18import os
19import statistics
20import subprocess
21import tempfile
22from io import StringIO
23from pathlib import Path
24from string import Template
25from typing import NewType, TypeVar, Iterable
26from typing import Optional
27
28Row = NewType("Row", dict[str, str])
29
30N = TypeVar("N", int, float)
31
32
33class Aggregation(enum.Enum):
34    # naked function as value assignment doesn't seem to work,
35    # hence wrapping in a singleton tuple
36    AVG = (statistics.mean,)
37    MAX = (max,)
38    MEDIAN = (statistics.median,)
39    MIN = (min,)
40    STDEV = (statistics.stdev,)
41
42    def fn(self, xs: Iterable[N]) -> N:
43        return self.value[0](xs)
44
45
46def _is_numeric(summary_row: Row) -> Optional[bool]:
47    for k, v in summary_row.items():
48        if k not in ("cuj", "targets"):
49            if ":" in v:
50                # presence of ':'  signifies a time field
51                return False
52            elif v.isnumeric():
53                return True
54    return None  # could not make a decision
55
56
57def prepare_script(
58    summary_csv_data: str, output: Path, filter: bool = True
59) -> Optional[str]:
60    reader: csv.DictReader = csv.DictReader(StringIO(summary_csv_data))
61    lines: list[str] = [",".join(reader.fieldnames)]
62    isnum = None
63
64    for summary_row in reader:
65        if isnum is None:
66            isnum = _is_numeric(summary_row)
67        cuj = summary_row.get("cuj")
68        if filter and ("rebuild" in cuj or "WARMUP" in cuj):
69            continue
70        # fall back to 0 if a values is missing for plotting
71        lines.append(",".join(v or "0" for v in summary_row.values()))
72
73    if len(lines) <= 1:
74        logging.warning("No data to plot")
75        return None
76
77    template_file = Path(os.path.dirname(__file__)).joinpath(
78        "plot_metrics.template.txt"
79    )
80    with open(template_file, "r") as fp:
81        script_template = Template(fp.read())
82
83    os.makedirs(output.parent, exist_ok=True)
84    column_count = len(reader.fieldnames)
85
86    return script_template.substitute(
87        column_count=column_count,
88        data="\n".join(lines),
89        output=output,
90        term=output.suffix[1:],  # assume terminal = output suffix, e.g. png, svg
91        width=max(160 * ((len(lines) + 4) // 4), 640),
92        ydata="# default to num" if isnum else "time",
93    )
94
95
96def _with_line_num(script: str) -> str:
97    return "".join(
98        f"{i + 1:2d}:{line}" for i, line in enumerate(script.splitlines(keepends=True))
99    )
100
101
102@functools.cache
103def _gnuplot_available() -> bool:
104    has_gnuplot = (
105        subprocess.run(
106            "gnuplot --version",
107            shell=True,
108            check=False,
109            stdout=subprocess.DEVNULL,
110            stderr=subprocess.DEVNULL,
111            text=True,
112        ).returncode
113        == 0
114    )
115    if not has_gnuplot:
116        logging.warning("gnuplot unavailable")
117    return has_gnuplot
118
119
120def plot(summary_csv_data: str, output: Path, filter: bool):
121    if not _gnuplot_available():
122        return
123    script = prepare_script(summary_csv_data, output, filter)
124    if script is None:
125        return  # no data to plot, probably due to the filter
126    with tempfile.NamedTemporaryFile("w+t") as gnuplot:
127        gnuplot.write(script)
128        gnuplot.flush()
129        p = subprocess.run(
130            args=["gnuplot", gnuplot.name],
131            shell=False,
132            check=False,
133            capture_output=True,
134            text=True,
135        )
136        logging.debug("GnuPlot script:\n%s", script)
137        if p.returncode:
138            logging.error("GnuPlot errors:\n%s\n%s", p.stderr, _with_line_num(script))
139        else:
140            logging.info(f"See %s\n%s", output, p.stdout)
141