1# Copyright (C) 2023 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import abc 15import dataclasses 16import enum 17import logging 18import os 19from pathlib import Path 20from typing import Callable, Iterable, TypeAlias 21 22import util 23from util import BuildType 24 25Action: TypeAlias = Callable[[], None] 26Verifier: TypeAlias = Callable[[], None] 27 28 29def de_src(p: Path) -> str: 30 return str(p.relative_to(util.get_top_dir())) 31 32 33def src(p: str) -> Path: 34 return util.get_top_dir().joinpath(p) 35 36 37class InWorkspace(enum.Enum): 38 """For a given file in the source tree, the counterpart in the symlink forest 39 could be one of these kinds. 40 """ 41 42 SYMLINK = enum.auto() 43 NOT_UNDER_SYMLINK = enum.auto() 44 UNDER_SYMLINK = enum.auto() 45 OMISSION = enum.auto() 46 47 @staticmethod 48 def ws_counterpart(src_path: Path) -> Path: 49 return util.get_out_dir().joinpath("soong/workspace").joinpath(de_src(src_path)) 50 51 def verifier(self, src_path: Path) -> Verifier: 52 @skip_for(BuildType.SOONG_ONLY) 53 def f(): 54 ws_path = InWorkspace.ws_counterpart(src_path) 55 actual: InWorkspace | None = None 56 if ws_path.is_symlink(): 57 actual = InWorkspace.SYMLINK 58 if not ws_path.exists(): 59 logging.warning("Dangling symlink %s", ws_path) 60 elif not ws_path.exists(): 61 actual = InWorkspace.OMISSION 62 else: 63 for p in ws_path.parents: 64 if not p.is_relative_to(util.get_out_dir()): 65 actual = InWorkspace.NOT_UNDER_SYMLINK 66 break 67 if p.is_symlink(): 68 actual = InWorkspace.UNDER_SYMLINK 69 break 70 71 if self != actual: 72 raise AssertionError( 73 f"{ws_path} expected {self.name} but got {actual.name}" 74 ) 75 logging.info(f"VERIFIED {de_src(ws_path)} {self.name}") 76 77 return f 78 79 80def skip_for(*build_types: util.BuildType): 81 def decorator(func: Callable[[], any]) -> Callable[[], any]: 82 def wrapper(): 83 if util.CURRENT_BUILD_TYPE not in build_types: 84 return func() 85 86 return wrapper 87 88 return decorator 89 90 91@skip_for(BuildType.SOONG_ONLY) 92def verify_symlink_forest_has_only_symlink_leaves(): 93 """Verifies that symlink forest has only symlinks or directories but no 94 files except for merged BUILD.bazel files""" 95 96 top_in_ws = InWorkspace.ws_counterpart(util.get_top_dir()) 97 98 for root, _, files in os.walk(top_in_ws, topdown=True, followlinks=False): 99 for file in files: 100 if file == "soong_build_mtime" and top_in_ws.samefile(root): 101 continue 102 f = Path(root).joinpath(file) 103 if file != "BUILD.bazel" and not f.is_symlink(): 104 raise AssertionError(f"{f} unexpected in symlink forest") 105 106 logging.info("VERIFIED Symlink Forest has no real files except BUILD.bazel") 107 108 109@dataclasses.dataclass(frozen=True) 110class CujStep: 111 verb: str 112 """a human-readable description""" 113 apply_change: Action 114 """user action(s) that are performed prior to a build attempt""" 115 verify: Verifier = verify_symlink_forest_has_only_symlink_leaves 116 """post-build assertions, i.e. tests. 117 Should raise `Exception` for failures. 118 """ 119 120 121class CujGroup(abc.ABC): 122 """A sequence of steps to be performed, such that at the end of all steps the 123 initial state of the source tree is attained. 124 NO attempt is made to achieve atomicity programmatically. It is left as the 125 responsibility of the user. 126 """ 127 def __init__(self, description: str): 128 self._desc = description 129 130 @property 131 def description(self)-> str: 132 return self._desc 133 134 @abc.abstractmethod 135 def get_steps(self) -> Iterable[CujStep]: 136 pass 137 138 139def sequence(*vs: Callable[[], None]) -> Callable[[], None]: 140 def f(): 141 for v in vs: 142 v() 143 144 return f 145