# Copyright 2016 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Git helper functions.""" import os import re import sys _path = os.path.realpath(__file__ + '/../..') if sys.path[0] != _path: sys.path.insert(0, _path) del _path # pylint: disable=wrong-import-position import rh.utils def get_upstream_remote(): """Returns the current upstream remote name.""" # First get the current branch name. cmd = ['git', 'rev-parse', '--abbrev-ref', 'HEAD'] result = rh.utils.run(cmd, capture_output=True) branch = result.stdout.strip() # Then get the remote associated with this branch. cmd = ['git', 'config', f'branch.{branch}.remote'] result = rh.utils.run(cmd, capture_output=True) return result.stdout.strip() def get_upstream_branch(): """Returns the upstream tracking branch of the current branch. Raises: Error if there is no tracking branch """ cmd = ['git', 'symbolic-ref', 'HEAD'] result = rh.utils.run(cmd, capture_output=True) current_branch = result.stdout.strip().replace('refs/heads/', '') if not current_branch: raise ValueError('Need to be on a tracking branch') cfg_option = 'branch.' + current_branch + '.' cmd = ['git', 'config', cfg_option + 'merge'] result = rh.utils.run(cmd, capture_output=True) full_upstream = result.stdout.strip() # If remote is not fully qualified, add an implicit namespace. if '/' not in full_upstream: full_upstream = f'refs/heads/{full_upstream}' cmd = ['git', 'config', cfg_option + 'remote'] result = rh.utils.run(cmd, capture_output=True) remote = result.stdout.strip() if not remote or not full_upstream: raise ValueError('Need to be on a tracking branch') return full_upstream.replace('heads', 'remotes/' + remote) def get_commit_for_ref(ref): """Returns the latest commit for this ref.""" cmd = ['git', 'rev-parse', ref] result = rh.utils.run(cmd, capture_output=True) return result.stdout.strip() def get_remote_revision(ref, remote): """Returns the remote revision for this ref.""" prefix = f'refs/remotes/{remote}/' if ref.startswith(prefix): return ref[len(prefix):] return ref def get_patch(commit): """Returns the patch for this commit.""" cmd = ['git', 'format-patch', '--stdout', '-1', commit] return rh.utils.run(cmd, capture_output=True).stdout def get_file_content(commit, path): """Returns the content of a file at a specific commit. We can't rely on the file as it exists in the filesystem as people might be uploading a series of changes which modifies the file multiple times. Note: The "content" of a symlink is just the target. So if you're expecting a full file, you should check that first. One way to detect is that the content will not have any newlines. """ cmd = ['git', 'show', f'{commit}:{path}'] return rh.utils.run(cmd, capture_output=True).stdout class RawDiffEntry(object): """Representation of a line from raw formatted git diff output.""" # pylint: disable=redefined-builtin def __init__(self, src_mode=0, dst_mode=0, src_sha=None, dst_sha=None, status=None, score=None, src_file=None, dst_file=None, file=None): self.src_mode = src_mode self.dst_mode = dst_mode self.src_sha = src_sha self.dst_sha = dst_sha self.status = status self.score = score self.src_file = src_file self.dst_file = dst_file self.file = file # This regular expression pulls apart a line of raw formatted git diff output. DIFF_RE = re.compile( r':(?P[0-7]*) (?P[0-7]*) ' r'(?P[0-9a-f]*)(\.)* (?P[0-9a-f]*)(\.)* ' r'(?P[ACDMRTUX])(?P[0-9]+)?\t' r'(?P[^\t]+)\t?(?P[^\t]+)?') def raw_diff(path, target): """Return the parsed raw format diff of target Args: path: Path to the git repository to diff in. target: The target to diff. Returns: A list of RawDiffEntry's. """ entries = [] cmd = ['git', 'diff', '--no-ext-diff', '-M', '--raw', target] diff = rh.utils.run(cmd, cwd=path, capture_output=True).stdout diff_lines = diff.strip().splitlines() for line in diff_lines: match = DIFF_RE.match(line) if not match: raise ValueError(f'Failed to parse diff output: {line}') rawdiff = RawDiffEntry(**match.groupdict()) rawdiff.src_mode = int(rawdiff.src_mode) rawdiff.dst_mode = int(rawdiff.dst_mode) rawdiff.file = (rawdiff.dst_file if rawdiff.dst_file else rawdiff.src_file) entries.append(rawdiff) return entries def get_affected_files(commit): """Returns list of file paths that were modified/added. Returns: A list of modified/added (and perhaps deleted) files """ return raw_diff(os.getcwd(), f'{commit}^-') def get_commits(ignore_merged_commits=False): """Returns a list of commits for this review.""" cmd = ['git', 'rev-list', f'{get_upstream_branch()}..'] if ignore_merged_commits: cmd.append('--first-parent') return rh.utils.run(cmd, capture_output=True).stdout.split() def get_commit_desc(commit): """Returns the full commit message of a commit.""" cmd = ['git', 'diff-tree', '-s', '--always', '--format=%B', commit] return rh.utils.run(cmd, capture_output=True).stdout def find_repo_root(path=None, outer=False): """Locate the top level of this repo checkout starting at |path|. Args: outer: Whether to find the outermost manifest, or the sub-manifest. """ if path is None: path = os.getcwd() orig_path = path path = os.path.abspath(path) # If we are working on a superproject instead of a repo client, use the # result from git directly. For regular repo client, this would return # empty string. cmd = ['git', 'rev-parse', '--show-superproject-working-tree'] git_worktree_path = rh.utils.run(cmd, cwd=path, capture_output=True).stdout.strip() if git_worktree_path: return git_worktree_path while not os.path.exists(os.path.join(path, '.repo')): path = os.path.dirname(path) if path == '/': raise ValueError(f'Could not locate .repo in {orig_path}') root = path if not outer and os.path.isdir(os.path.join(root, '.repo', 'submanifests')): # If there are submanifests, walk backward from path until we find the # corresponding submanifest root. abs_orig_path = os.path.abspath(orig_path) parts = os.path.relpath(abs_orig_path, root).split(os.path.sep) while parts and not os.path.isdir( os.path.join(root, '.repo', 'submanifests', *parts, 'manifests')): parts.pop() path = os.path.join(root, *parts) return path def is_git_repository(path): """Returns True if the path is a valid git repository.""" cmd = ['git', 'rev-parse', '--resolve-git-dir', os.path.join(path, '.git')] result = rh.utils.run(cmd, capture_output=True, check=False) return result.returncode == 0