1# Copyright (C) 2020 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Module to check updates from crates.io."""
15
16import json
17import os
18from pathlib import Path
19import re
20import shutil
21import tempfile
22import urllib.request
23from typing import IO
24
25import archive_utils
26from base_updater import Updater
27import git_utils
28# pylint: disable=import-error
29import metadata_pb2  # type: ignore
30import updater_utils
31
32LIBRARY_NAME_PATTERN: str = r"([-\w]+)"
33
34ALPHA_BETA_PATTERN: str = r"^.*[0-9]+\.[0-9]+\.[0-9]+-(alpha|beta).*"
35
36ALPHA_BETA_RE: re.Pattern = re.compile(ALPHA_BETA_PATTERN)
37
38"""Match both x.y.z and x.y.z+a.b.c which is used by some Vulkan binding libraries"""
39VERSION_PATTERN: str = r"([0-9]+)\.([0-9]+)\.([0-9]+)(\+([0-9]+)\.([0-9]+)\.([0-9]+))?"
40
41VERSION_RE: re.Pattern = re.compile(VERSION_PATTERN)
42
43CRATES_IO_ARCHIVE_URL_PATTERN: str = (r"^https:\/\/static.crates.io\/crates\/" +
44                                      LIBRARY_NAME_PATTERN + "/" +
45                                      LIBRARY_NAME_PATTERN + "-" +
46                                      "(.*?)" + ".crate")
47
48CRATES_IO_ARCHIVE_URL_RE: re.Pattern = re.compile(CRATES_IO_ARCHIVE_URL_PATTERN)
49
50DESCRIPTION_PATTERN: str = r"^description *= *(\".+\")"
51
52DESCRIPTION_RE: re.Pattern = re.compile(DESCRIPTION_PATTERN)
53
54
55class CratesUpdater(Updater):
56    """Updater for crates.io packages."""
57
58    UPSTREAM_REMOTE_NAME: str = "update_origin"
59    download_url: str
60    package: str
61    package_dir: str
62    temp_file: IO
63
64    def is_supported_url(self) -> bool:
65        match = CRATES_IO_ARCHIVE_URL_RE.match(self._old_identifier.value)
66        if match is None:
67            return False
68        self.package = match.group(1)
69        return True
70
71    def setup_remote(self) -> None:
72        url = "https://crates.io/api/v1/crates/" + self.package
73        with urllib.request.urlopen(url) as request:
74            data = json.loads(request.read().decode())
75        homepage = data["crate"]["repository"]
76        remotes = git_utils.list_remotes(self._proj_path)
77        current_remote_url = None
78        for name, url in remotes.items():
79            if name == self.UPSTREAM_REMOTE_NAME:
80                current_remote_url = url
81
82        if current_remote_url is not None and current_remote_url != homepage:
83            git_utils.remove_remote(self._proj_path, self.UPSTREAM_REMOTE_NAME)
84            current_remote_url = None
85
86        if current_remote_url is None:
87            git_utils.add_remote(self._proj_path, self.UPSTREAM_REMOTE_NAME, homepage)
88
89        branch = git_utils.detect_default_branch(self._proj_path,
90                                                 self.UPSTREAM_REMOTE_NAME)
91        git_utils.fetch(self._proj_path, self.UPSTREAM_REMOTE_NAME, branch)
92
93    def _get_version_numbers(self, version: str) -> tuple[int, int, int]:
94        match = VERSION_RE.match(version)
95        if match is not None:
96            return (
97                int(match.group(1)),
98                int(match.group(2)),
99                int(match.group(3)),
100            )
101        return (0, 0, 0)
102
103    def _is_newer_version(self, prev_version: str, prev_id: int,
104                          check_version: str, check_id: int):
105        """Return true if check_version+id is newer than prev_version+id."""
106        return ((self._get_version_numbers(check_version), check_id) >
107                (self._get_version_numbers(prev_version), prev_id))
108
109    def _find_latest_non_test_version(self) -> None:
110        url = f"https://crates.io/api/v1/crates/{self.package}/versions"
111        with urllib.request.urlopen(url) as request:
112            data = json.loads(request.read().decode())
113        last_id = 0
114        self._new_identifier.version = ""
115        for v in data["versions"]:
116            version = v["num"]
117            if (not v["yanked"] and not ALPHA_BETA_RE.match(version) and
118                self._is_newer_version(
119                    self._new_identifier.version, last_id, version, int(v["id"]))):
120                last_id = int(v["id"])
121                self._new_identifier.version = version
122                self.download_url = "https://crates.io" + v["dl_path"]
123
124    def check(self) -> None:
125        """Checks crates.io and returns whether a new version is available."""
126        url = "https://crates.io/api/v1/crates/" + self.package
127        with urllib.request.urlopen(url) as request:
128            data = json.loads(request.read().decode())
129            self._new_identifier.version = data["crate"]["max_version"]
130        # Skip d.d.d-{alpha,beta}* versions
131        if ALPHA_BETA_RE.match(self._new_identifier.version):
132            print(f"Ignore alpha or beta release:{self.package}-{self._new_identifier.version}.")
133            self._find_latest_non_test_version()
134        else:
135            url = url + "/" + self._new_identifier.version
136            with urllib.request.urlopen(url) as request:
137                data = json.loads(request.read().decode())
138                self.download_url = "https://crates.io" + data["version"]["dl_path"]
139
140    def set_new_version_to_old(self):
141        super().refresh_without_upgrading()
142        # A shortcut to use the static download path.
143        self.download_url = f"https://static.crates.io/crates/{self.package}/" \
144                            f"{self.package}-{self._new_identifier.version}.crate"
145
146    def update(self) -> None:
147        """Updates the package.
148
149        Has to call check() before this function.
150        """
151        try:
152            temporary_dir = archive_utils.download_and_extract(self.download_url)
153            self.package_dir = archive_utils.find_archive_root(temporary_dir)
154            self.temp_file = tempfile.NamedTemporaryFile()
155            updater_utils.replace_package(self.package_dir, self._proj_path,
156                                          self.temp_file.name)
157            self.check_for_errors()
158        finally:
159            urllib.request.urlcleanup()
160
161    def rollback(self) -> bool:
162        # Only rollback if we have already swapped,
163        # which we denote by writing to this file.
164        if os.fstat(self.temp_file.fileno()).st_size > 0:
165            tmp_dir = tempfile.TemporaryDirectory()
166            shutil.move(self._proj_path, tmp_dir.name)
167            shutil.move(self.package_dir, self._proj_path)
168            shutil.move(Path(tmp_dir.name) / self.package, self.package_dir)
169            return True
170        return False
171
172    def update_metadata(self, metadata: metadata_pb2.MetaData) -> metadata_pb2:
173        """Updates METADATA content."""
174        # copy only HOMEPAGE url, and then add new ARCHIVE url.
175        updated_metadata = super().update_metadata(metadata)
176        for identifier in updated_metadata.third_party.identifier:
177            if identifier.version:
178                identifier.value = f"https://static.crates.io/crates/" \
179                                   f"{updated_metadata.name}/"\
180                                   f"{updated_metadata.name}" \
181                                   f"-{self.latest_identifier.version}.crate"
182                break
183        # copy description from Cargo.toml to METADATA
184        cargo_toml = os.path.join(self.project_path, "Cargo.toml")
185        description = self._get_cargo_description(cargo_toml)
186        if description and description != updated_metadata.description:
187            print("New METADATA description:", description)
188            updated_metadata.description = description
189        return updated_metadata
190
191    def check_for_errors(self) -> None:
192        # Check for .rej patches from failing to apply patches.
193        # If this has too many false positives, we could either
194        # check if the files are modified by patches or somehow
195        # track which files existed before the patching.
196        rejects = list(self._proj_path.glob('**/*.rej'))
197        if len(rejects) > 0:
198            print(f"Error: Found patch reject files: {str(rejects)}")
199            self._has_errors = True
200
201    def _toml2str(self, line: str) -> str:
202        """Convert a quoted toml string to a Python str without quotes."""
203        if line.startswith("\"\"\""):
204            return ""  # cannot handle broken multi-line description
205        # TOML string escapes: \b \t \n \f \r \" \\ (no unicode escape)
206        line = line[1:-1].replace("\\\\", "\n").replace("\\b", "")
207        line = line.replace("\\t", " ").replace("\\n", " ").replace("\\f", " ")
208        line = line.replace("\\r", "").replace("\\\"", "\"").replace("\n", "\\")
209        # replace a unicode quotation mark, used in the libloading crate
210        return line.replace("’", "'").strip()
211
212    def _get_cargo_description(self, cargo_toml: str) -> str:
213        """Return the description in Cargo.toml or empty string."""
214        if os.path.isfile(cargo_toml) and os.access(cargo_toml, os.R_OK):
215            with open(cargo_toml, "r", encoding="utf-8") as toml_file:
216                for line in toml_file:
217                    match = DESCRIPTION_RE.match(line)
218                    if match:
219                        return self._toml2str(match.group(1))
220        return ""
221