1# Copyright (C) 2023 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import abc
15import dataclasses
16import enum
17import logging
18import os
19from pathlib import Path
20from typing import Callable, Iterable, TypeAlias
21
22import util
23from util import BuildType
24
25Action: TypeAlias = Callable[[], None]
26Verifier: TypeAlias = Callable[[], None]
27
28
29def de_src(p: Path) -> str:
30    return str(p.relative_to(util.get_top_dir()))
31
32
33def src(p: str) -> Path:
34    return util.get_top_dir().joinpath(p)
35
36
37class InWorkspace(enum.Enum):
38    """For a given file in the source tree, the counterpart in the symlink forest
39    could be one of these kinds.
40    """
41
42    SYMLINK = enum.auto()
43    NOT_UNDER_SYMLINK = enum.auto()
44    UNDER_SYMLINK = enum.auto()
45    OMISSION = enum.auto()
46
47    @staticmethod
48    def ws_counterpart(src_path: Path) -> Path:
49        return util.get_out_dir().joinpath("soong/workspace").joinpath(de_src(src_path))
50
51    def verifier(self, src_path: Path) -> Verifier:
52        @skip_for(BuildType.SOONG_ONLY)
53        def f():
54            ws_path = InWorkspace.ws_counterpart(src_path)
55            actual: InWorkspace | None = None
56            if ws_path.is_symlink():
57                actual = InWorkspace.SYMLINK
58                if not ws_path.exists():
59                    logging.warning("Dangling symlink %s", ws_path)
60            elif not ws_path.exists():
61                actual = InWorkspace.OMISSION
62            else:
63                for p in ws_path.parents:
64                    if not p.is_relative_to(util.get_out_dir()):
65                        actual = InWorkspace.NOT_UNDER_SYMLINK
66                        break
67                    if p.is_symlink():
68                        actual = InWorkspace.UNDER_SYMLINK
69                        break
70
71            if self != actual:
72                raise AssertionError(
73                    f"{ws_path} expected {self.name} but got {actual.name}"
74                )
75            logging.info(f"VERIFIED {de_src(ws_path)} {self.name}")
76
77        return f
78
79
80def skip_for(*build_types: util.BuildType):
81    def decorator(func: Callable[[], any]) -> Callable[[], any]:
82        def wrapper():
83            if util.CURRENT_BUILD_TYPE not in build_types:
84                return func()
85
86        return wrapper
87
88    return decorator
89
90
91@skip_for(BuildType.SOONG_ONLY)
92def verify_symlink_forest_has_only_symlink_leaves():
93    """Verifies that symlink forest has only symlinks or directories but no
94    files except for merged BUILD.bazel files"""
95
96    top_in_ws = InWorkspace.ws_counterpart(util.get_top_dir())
97
98    for root, _, files in os.walk(top_in_ws, topdown=True, followlinks=False):
99        for file in files:
100            if file == "soong_build_mtime" and top_in_ws.samefile(root):
101                continue
102            f = Path(root).joinpath(file)
103            if file != "BUILD.bazel" and not f.is_symlink():
104                raise AssertionError(f"{f} unexpected in symlink forest")
105
106    logging.info("VERIFIED Symlink Forest has no real files except BUILD.bazel")
107
108
109@dataclasses.dataclass(frozen=True)
110class CujStep:
111    verb: str
112    """a human-readable description"""
113    apply_change: Action
114    """user action(s) that are performed prior to a build attempt"""
115    verify: Verifier = verify_symlink_forest_has_only_symlink_leaves
116    """post-build assertions, i.e. tests.
117  Should raise `Exception` for failures.
118  """
119
120
121class CujGroup(abc.ABC):
122    """A sequence of steps to be performed, such that at the end of all steps the
123    initial state of the source tree is attained.
124    NO attempt is made to achieve atomicity programmatically. It is left as the
125    responsibility of the user.
126    """
127    def __init__(self, description: str):
128        self._desc = description
129
130    @property
131    def description(self)-> str:
132        return self._desc
133
134    @abc.abstractmethod
135    def get_steps(self) -> Iterable[CujStep]:
136        pass
137
138
139def sequence(*vs: Callable[[], None]) -> Callable[[], None]:
140    def f():
141        for v in vs:
142            v()
143
144    return f
145