1# Copyright (C) 2018 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Module to update packages from GitHub archive."""
15
16import json
17import re
18import urllib.request
19import urllib.error
20from typing import List, Optional, Tuple
21
22import archive_utils
23from base_updater import Updater
24import git_utils
25# pylint: disable=import-error
26import updater_utils
27GITHUB_URL_PATTERN: str = (r'^https:\/\/github.com\/([-\w]+)\/([-\w]+)\/' +
28                           r'(releases\/download\/|archive\/)')
29GITHUB_URL_RE: re.Pattern = re.compile(GITHUB_URL_PATTERN)
30
31
32def _edit_distance(str1: str, str2: str) -> int:
33    prev = list(range(0, len(str2) + 1))
34    for i, chr1 in enumerate(str1):
35        cur = [i + 1]
36        for j, chr2 in enumerate(str2):
37            if chr1 == chr2:
38                cur.append(prev[j])
39            else:
40                cur.append(min(prev[j + 1], prev[j], cur[j]) + 1)
41        prev = cur
42    return prev[len(str2)]
43
44
45def choose_best_url(urls: List[str], previous_url: str) -> str:
46    """Returns the best url to download from a list of candidate urls.
47
48    This function calculates similarity between previous url and each of new
49    urls. And returns the one best matches previous url.
50
51    Similarity is measured by editing distance.
52
53    Args:
54        urls: Array of candidate urls.
55        previous_url: String of the url used previously.
56
57    Returns:
58        One url from `urls`.
59    """
60    return min(urls,
61               default="",
62               key=lambda url: _edit_distance(url, previous_url))
63
64
65class GithubArchiveUpdater(Updater):
66    """Updater for archives from GitHub.
67
68    This updater supports release archives in GitHub. Version is determined by
69    release name in GitHub.
70    """
71
72    UPSTREAM_REMOTE_NAME: str = "update_origin"
73    VERSION_FIELD: str = 'tag_name'
74    owner: str
75    repo: str
76
77    def is_supported_url(self) -> bool:
78        if self._old_identifier.type.lower() != 'archive':
79            return False
80        match = GITHUB_URL_RE.match(self._old_identifier.value)
81        if match is None:
82            return False
83        try:
84            self.owner, self.repo = match.group(1, 2)
85        except IndexError:
86            return False
87        return True
88
89    def _fetch_latest_release(self) -> Optional[Tuple[str, List[str]]]:
90        # pylint: disable=line-too-long
91        url = f'https://api.github.com/repos/{self.owner}/{self.repo}/releases/latest'
92        try:
93            with urllib.request.urlopen(url) as request:
94                data = json.loads(request.read().decode())
95        except urllib.error.HTTPError as err:
96            if err.code == 404:
97                return None
98            raise
99        supported_assets = [
100            a['browser_download_url'] for a in data['assets']
101            if archive_utils.is_supported_archive(a['browser_download_url'])
102        ]
103        return data[self.VERSION_FIELD], supported_assets
104
105    def setup_remote(self) -> None:
106        homepage = f'https://github.com/{self.owner}/{self.repo}'
107        remotes = git_utils.list_remotes(self._proj_path)
108        current_remote_url = None
109        for name, url in remotes.items():
110            if name == self.UPSTREAM_REMOTE_NAME:
111                current_remote_url = url
112
113        if current_remote_url is not None and current_remote_url != homepage:
114            git_utils.remove_remote(self._proj_path, self.UPSTREAM_REMOTE_NAME)
115            current_remote_url = None
116
117        if current_remote_url is None:
118            git_utils.add_remote(self._proj_path, self.UPSTREAM_REMOTE_NAME, homepage)
119
120        git_utils.fetch(self._proj_path, self.UPSTREAM_REMOTE_NAME)
121
122    def _fetch_latest_tag(self) -> Tuple[str, List[str]]:
123        """We want to avoid hitting GitHub API rate limit by using alternative solutions."""
124        tags = git_utils.list_remote_tags(self._proj_path, self.UPSTREAM_REMOTE_NAME)
125        parsed_tags = [updater_utils.parse_remote_tag(tag) for tag in tags]
126        tag = updater_utils.get_latest_stable_release_tag(self._old_identifier.version, parsed_tags)
127        return tag, []
128
129    def _fetch_latest_version(self) -> None:
130        """Checks upstream and gets the latest release tag."""
131        self._new_identifier.version, urls = (self._fetch_latest_release()
132                               or self._fetch_latest_tag())
133
134        # Adds source code urls.
135        urls.append(f'https://github.com/{self.owner}/{self.repo}/archive/'
136                    f'{self._new_identifier.version}.tar.gz')
137        urls.append(f'https://github.com/{self.owner}/{self.repo}/archive/'
138                    f'{self._new_identifier.version}.zip')
139
140        self._new_identifier.value = choose_best_url(urls, self._old_identifier.value)
141
142    def _fetch_latest_commit(self) -> None:
143        """Checks upstream and gets the latest commit to default branch."""
144
145        # pylint: disable=line-too-long
146        branch = git_utils.detect_default_branch(self._proj_path,
147                                                 self.UPSTREAM_REMOTE_NAME)
148        self._new_identifier.version = git_utils.get_sha_for_branch(
149            self._proj_path, self.UPSTREAM_REMOTE_NAME + '/' + branch)
150        self._new_identifier.value = (
151            # pylint: disable=line-too-long
152            f'https://github.com/{self.owner}/{self.repo}/archive/{self._new_identifier.version}.zip'
153        )
154
155    def check(self) -> None:
156        """Checks update for package.
157
158        Returns True if a new version is available.
159        """
160        self.setup_remote()
161        if git_utils.is_commit(self._old_identifier.version):
162            self._fetch_latest_commit()
163        else:
164            self._fetch_latest_version()
165
166    def update(self) -> None:
167        """Updates the package.
168
169        Has to call check() before this function.
170        """
171        temporary_dir = None
172        try:
173            temporary_dir = archive_utils.download_and_extract(
174                self._new_identifier.value)
175            package_dir = archive_utils.find_archive_root(temporary_dir)
176            updater_utils.replace_package(package_dir, self._proj_path)
177        finally:
178            # Don't remove the temporary directory, or it'll be impossible
179            # to debug the failure...
180            # shutil.rmtree(temporary_dir, ignore_errors=True)
181            urllib.request.urlcleanup()
182