Source code for bookbuilderpy.git

"""Tools for interacting with git."""
import datetime
import re
from dataclasses import dataclass
from shutil import rmtree
from subprocess import TimeoutExpired  # nosec
from typing import Final

from bookbuilderpy.logger import logger
from bookbuilderpy.path import Path
from bookbuilderpy.shell import shell
from bookbuilderpy.strings import (
    datetime_to_datetime_str,
    enforce_non_empty_str,
    enforce_non_empty_str_without_ws,
    enforce_url,
)
from bookbuilderpy.types import type_error
from bookbuilderpy.versions import TOOL_GIT, has_tool


[docs]@dataclass(frozen=True, init=False, order=True) class Repo: """An immutable record of a git repository.""" #: the repository path path: Path #: the repository url url: str #: the commit commit: str #: the date and time date_time: str def __init__(self, path: Path, url: str, commit: str, date_time: str): """ Set up the information about a repository. :param path: the path :param url: the url :param commit: the commit :param date_time: the date and time """ if not isinstance(path, Path): raise type_error(path, "path", Path) path.enforce_dir() object.__setattr__(self, "path", path) object.__setattr__(self, "url", enforce_url(url)) object.__setattr__(self, "commit", enforce_non_empty_str_without_ws(commit)) if len(self.commit) != 40: raise ValueError(f"Invalid commit: '{self.commit}'.") try: int(self.commit, 16) except ValueError as e: raise ValueError("Invalid commit information " f"'{self.commit}' for repo '{url}'.") from e object.__setattr__(self, "date_time", enforce_non_empty_str(date_time)) logger(f"found repository in path '{self.path}' with commit " f"'{self.commit}' for url '{self.url}' and " f"date '{self.date_time}'.")
[docs] @staticmethod def download(url: str, dest_dir: str) -> "Repo": """ Download a git repository. :param url: the repository url :param dest_dir: the destination directory :return: the repository information """ if not has_tool(TOOL_GIT): raise ValueError(f"No '{TOOL_GIT}' installation found.") dest: Final[Path] = Path.path(dest_dir) dest.ensure_dir_exists() url = enforce_url(url) s = f" repository '{url}' to directory '{dest}'" logger(f"starting to load{s} via '{TOOL_GIT}'.") try: shell([TOOL_GIT, "-C", dest, "clone", "--depth", "1", url, dest], timeout=300, cwd=dest) except TimeoutExpired: if url.startswith("https://github.com"): url2 = enforce_url(f"ssh://git@{url[8:]}") logger(f"timeout when loading url '{url}', so we try " f"'{url2}' instead, but first delete '{dest}'.") rmtree(dest, ignore_errors=True, onerror=None) dest.ensure_dir_exists() logger(f"'{dest}' deleted and created, now re-trying cloning.") shell([TOOL_GIT, "-C", dest, "clone", "--depth", "1", url2, dest], timeout=300, cwd=dest) else: logger(f"timeout when loading url '{url}'.") raise logger(f"successfully finished loading{s}.") return Repo.from_local(path=dest, url=url)
[docs] @staticmethod def from_local(path: str, url: str | None = None) -> "Repo": """ Load all the information from an local repository. :param path: the path to the repository :param url: the url :return: the repository information """ if not has_tool(TOOL_GIT): raise ValueError(f"No '{TOOL_GIT}' installation found.") dest: Final[Path] = Path.path(path) dest.enforce_dir() logger( f"checking commit information of repo '{dest}' via '{TOOL_GIT}'.") stdout: str = enforce_non_empty_str(shell( [TOOL_GIT, "-C", dest, "log", "--no-abbrev-commit", "-1"], timeout=120, cwd=dest, wants_stdout=True)) match = re.search("^\\s*commit\\s+(.+?)\\s+", stdout, flags=re.MULTILINE) if match is None: raise ValueError( f"Did not find commit information in repo '{dest}'.") commit: Final[str] = enforce_non_empty_str_without_ws(match.group(1)) match = re.search("^\\s*Date:\\s+(.+?)$", stdout, flags=re.MULTILINE) if match is None: raise ValueError( f"Did not find date information in repo '{dest}'.") date_str: Final[str] = enforce_non_empty_str(match.group(1)) date_raw: Final[datetime.datetime] = datetime.datetime.strptime( date_str, "%a %b %d %H:%M:%S %Y %z") if not isinstance(date_raw, datetime.datetime): raise type_error(date_raw, "date_raw", datetime.datetime) date_time: Final[str] = datetime_to_datetime_str(date_raw) logger(f"found commit '{commit}' and date/time '{date_time}' " f"for repo '{dest}'.") if url is None: logger(f"applying '{TOOL_GIT}' to get url information.") url = enforce_non_empty_str(shell( [TOOL_GIT, "-C", dest, "config", "--get", "remote.origin.url"], timeout=120, cwd=dest, wants_stdout=True)) url = enforce_non_empty_str_without_ws( url.strip().split("\n")[0].strip()) if url.endswith("/.git"): url = enforce_non_empty_str_without_ws(f"{url[:-5]}.git") if url.endswith("/"): url = enforce_non_empty_str_without_ws(url[:-1]) logger(f"found url '{url}' for repo '{dest}'.") if url.startswith("ssh://git@github.com"): url = f"https://{url[10:]}" return Repo(dest, url, commit, date_time)
[docs] def get_base_url(self) -> str: """ Get the base url of this repository. :return: the base url of this repository """ base_url = self.url base_url_lower = base_url.lower() if base_url_lower.startswith("ssh://git@github."): base_url = f"https://{enforce_non_empty_str(base_url[10:])}" if base_url_lower.endswith(".git"): base_url = enforce_non_empty_str(base_url[:-4]) return enforce_url(base_url)
[docs] def make_url(self, relative_path: str) -> str: """ Make an url relative to this repository. :param relative_path: the relative path :return: the url """ pt: Final[Path] = self.path.resolve_inside(relative_path) pt.ensure_file_exists() path: Final[str] = pt.relative_to(self.path) base_url = self.get_base_url() if "github.com" in base_url.lower(): base_url = f"{base_url}/blob/{self.commit}/{path}" else: base_url = f"{base_url}/{path}" return enforce_url(base_url)
[docs] def get_name(self) -> str: """ Get the name of this repository in the form 'user/name'. :return: the name of this repository in the form 'user/name'. """ base_url = self.url if base_url.lower().endswith(".git"): base_url = enforce_non_empty_str_without_ws(base_url[:-4]) si = base_url.rfind("/") if si <= 0: return base_url si = max(0, base_url.rfind("/", 0, si - 1)) return enforce_non_empty_str(base_url[si + 1:].strip())