1# Copyright (C) 2018 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Module to update packages from GitHub archive.""" 15 16import json 17import re 18import urllib.request 19import urllib.error 20from typing import List, Optional, Tuple 21 22import archive_utils 23from base_updater import Updater 24import git_utils 25# pylint: disable=import-error 26import updater_utils 27GITHUB_URL_PATTERN: str = (r'^https:\/\/github.com\/([-\w]+)\/([-\w]+)\/' + 28 r'(releases\/download\/|archive\/)') 29GITHUB_URL_RE: re.Pattern = re.compile(GITHUB_URL_PATTERN) 30 31 32def _edit_distance(str1: str, str2: str) -> int: 33 prev = list(range(0, len(str2) + 1)) 34 for i, chr1 in enumerate(str1): 35 cur = [i + 1] 36 for j, chr2 in enumerate(str2): 37 if chr1 == chr2: 38 cur.append(prev[j]) 39 else: 40 cur.append(min(prev[j + 1], prev[j], cur[j]) + 1) 41 prev = cur 42 return prev[len(str2)] 43 44 45def choose_best_url(urls: List[str], previous_url: str) -> str: 46 """Returns the best url to download from a list of candidate urls. 47 48 This function calculates similarity between previous url and each of new 49 urls. And returns the one best matches previous url. 50 51 Similarity is measured by editing distance. 52 53 Args: 54 urls: Array of candidate urls. 55 previous_url: String of the url used previously. 56 57 Returns: 58 One url from `urls`. 59 """ 60 return min(urls, 61 default="", 62 key=lambda url: _edit_distance(url, previous_url)) 63 64 65class GithubArchiveUpdater(Updater): 66 """Updater for archives from GitHub. 67 68 This updater supports release archives in GitHub. Version is determined by 69 release name in GitHub. 70 """ 71 72 UPSTREAM_REMOTE_NAME: str = "update_origin" 73 VERSION_FIELD: str = 'tag_name' 74 owner: str 75 repo: str 76 77 def is_supported_url(self) -> bool: 78 if self._old_identifier.type.lower() != 'archive': 79 return False 80 match = GITHUB_URL_RE.match(self._old_identifier.value) 81 if match is None: 82 return False 83 try: 84 self.owner, self.repo = match.group(1, 2) 85 except IndexError: 86 return False 87 return True 88 89 def _fetch_latest_release(self) -> Optional[Tuple[str, List[str]]]: 90 # pylint: disable=line-too-long 91 url = f'https://api.github.com/repos/{self.owner}/{self.repo}/releases/latest' 92 try: 93 with urllib.request.urlopen(url) as request: 94 data = json.loads(request.read().decode()) 95 except urllib.error.HTTPError as err: 96 if err.code == 404: 97 return None 98 raise 99 supported_assets = [ 100 a['browser_download_url'] for a in data['assets'] 101 if archive_utils.is_supported_archive(a['browser_download_url']) 102 ] 103 return data[self.VERSION_FIELD], supported_assets 104 105 def setup_remote(self) -> None: 106 homepage = f'https://github.com/{self.owner}/{self.repo}' 107 remotes = git_utils.list_remotes(self._proj_path) 108 current_remote_url = None 109 for name, url in remotes.items(): 110 if name == self.UPSTREAM_REMOTE_NAME: 111 current_remote_url = url 112 113 if current_remote_url is not None and current_remote_url != homepage: 114 git_utils.remove_remote(self._proj_path, self.UPSTREAM_REMOTE_NAME) 115 current_remote_url = None 116 117 if current_remote_url is None: 118 git_utils.add_remote(self._proj_path, self.UPSTREAM_REMOTE_NAME, homepage) 119 120 git_utils.fetch(self._proj_path, self.UPSTREAM_REMOTE_NAME) 121 122 def _fetch_latest_tag(self) -> Tuple[str, List[str]]: 123 """We want to avoid hitting GitHub API rate limit by using alternative solutions.""" 124 tags = git_utils.list_remote_tags(self._proj_path, self.UPSTREAM_REMOTE_NAME) 125 parsed_tags = [updater_utils.parse_remote_tag(tag) for tag in tags] 126 tag = updater_utils.get_latest_stable_release_tag(self._old_identifier.version, parsed_tags) 127 return tag, [] 128 129 def _fetch_latest_version(self) -> None: 130 """Checks upstream and gets the latest release tag.""" 131 self._new_identifier.version, urls = (self._fetch_latest_release() 132 or self._fetch_latest_tag()) 133 134 # Adds source code urls. 135 urls.append(f'https://github.com/{self.owner}/{self.repo}/archive/' 136 f'{self._new_identifier.version}.tar.gz') 137 urls.append(f'https://github.com/{self.owner}/{self.repo}/archive/' 138 f'{self._new_identifier.version}.zip') 139 140 self._new_identifier.value = choose_best_url(urls, self._old_identifier.value) 141 142 def _fetch_latest_commit(self) -> None: 143 """Checks upstream and gets the latest commit to default branch.""" 144 145 # pylint: disable=line-too-long 146 branch = git_utils.detect_default_branch(self._proj_path, 147 self.UPSTREAM_REMOTE_NAME) 148 self._new_identifier.version = git_utils.get_sha_for_branch( 149 self._proj_path, self.UPSTREAM_REMOTE_NAME + '/' + branch) 150 self._new_identifier.value = ( 151 # pylint: disable=line-too-long 152 f'https://github.com/{self.owner}/{self.repo}/archive/{self._new_identifier.version}.zip' 153 ) 154 155 def check(self) -> None: 156 """Checks update for package. 157 158 Returns True if a new version is available. 159 """ 160 self.setup_remote() 161 if git_utils.is_commit(self._old_identifier.version): 162 self._fetch_latest_commit() 163 else: 164 self._fetch_latest_version() 165 166 def update(self) -> None: 167 """Updates the package. 168 169 Has to call check() before this function. 170 """ 171 temporary_dir = None 172 try: 173 temporary_dir = archive_utils.download_and_extract( 174 self._new_identifier.value) 175 package_dir = archive_utils.find_archive_root(temporary_dir) 176 updater_utils.replace_package(package_dir, self._proj_path) 177 finally: 178 # Don't remove the temporary directory, or it'll be impossible 179 # to debug the failure... 180 # shutil.rmtree(temporary_dir, ignore_errors=True) 181 urllib.request.urlcleanup() 182