Coverage for bookbuilderpy/git.py: 66%
115 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-17 23:15 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-17 23:15 +0000
1"""Tools for interacting with git."""
2import datetime
3import re
4from dataclasses import dataclass
5from shutil import rmtree
6from subprocess import TimeoutExpired # nosec
7from typing import Final
9from bookbuilderpy.logger import logger
10from bookbuilderpy.path import Path
11from bookbuilderpy.shell import shell
12from bookbuilderpy.strings import (
13 datetime_to_datetime_str,
14 enforce_non_empty_str,
15 enforce_non_empty_str_without_ws,
16 enforce_url,
17)
18from bookbuilderpy.types import type_error
19from bookbuilderpy.versions import TOOL_GIT, has_tool
22@dataclass(frozen=True, init=False, order=True)
23class Repo:
24 """An immutable record of a git repository."""
26 #: the repository path
27 path: Path
28 #: the repository url
29 url: str
30 #: the commit
31 commit: str
32 #: the date and time
33 date_time: str
35 def __init__(self,
36 path: Path,
37 url: str,
38 commit: str,
39 date_time: str):
40 """
41 Set up the information about a repository.
43 :param path: the path
44 :param url: the url
45 :param commit: the commit
46 :param date_time: the date and time
47 """
48 if not isinstance(path, Path):
49 raise type_error(path, "path", Path)
50 path.enforce_dir()
51 object.__setattr__(self, "path", path)
52 object.__setattr__(self, "url", enforce_url(url))
53 object.__setattr__(self, "commit",
54 enforce_non_empty_str_without_ws(commit))
55 if len(self.commit) != 40:
56 raise ValueError(f"Invalid commit: '{self.commit}'.")
57 try:
58 int(self.commit, 16)
59 except ValueError as e:
60 raise ValueError("Invalid commit information "
61 f"'{self.commit}' for repo '{url}'.") from e
62 object.__setattr__(self, "date_time",
63 enforce_non_empty_str(date_time))
64 logger(f"found repository in path '{self.path}' with commit "
65 f"'{self.commit}' for url '{self.url}' and "
66 f"date '{self.date_time}'.")
68 @staticmethod
69 def download(url: str,
70 dest_dir: str) -> "Repo":
71 """
72 Download a git repository.
74 :param url: the repository url
75 :param dest_dir: the destination directory
76 :return: the repository information
77 """
78 if not has_tool(TOOL_GIT):
79 raise ValueError(f"No '{TOOL_GIT}' installation found.")
81 dest: Final[Path] = Path.path(dest_dir)
82 dest.ensure_dir_exists()
83 url = enforce_url(url)
84 s = f" repository '{url}' to directory '{dest}'"
85 logger(f"starting to load{s} via '{TOOL_GIT}'.")
86 try:
87 shell([TOOL_GIT, "-C", dest, "clone",
88 "--depth", "1", url, dest], timeout=300,
89 cwd=dest)
90 except TimeoutExpired:
91 if url.startswith("https://github.com"):
92 url2 = enforce_url(f"ssh://git@{url[8:]}")
93 logger(f"timeout when loading url '{url}', so we try "
94 f"'{url2}' instead, but first delete '{dest}'.")
95 rmtree(dest, ignore_errors=True, onerror=None)
96 dest.ensure_dir_exists()
97 logger(f"'{dest}' deleted and created, now re-trying cloning.")
98 shell([TOOL_GIT, "-C", dest, "clone",
99 "--depth", "1", url2, dest], timeout=300,
100 cwd=dest)
101 else:
102 logger(f"timeout when loading url '{url}'.")
103 raise
104 logger(f"successfully finished loading{s}.")
106 return Repo.from_local(path=dest, url=url)
108 @staticmethod
109 def from_local(path: str,
110 url: str | None = None) -> "Repo":
111 """
112 Load all the information from an local repository.
114 :param path: the path to the repository
115 :param url: the url
116 :return: the repository information
117 """
118 if not has_tool(TOOL_GIT):
119 raise ValueError(f"No '{TOOL_GIT}' installation found.")
121 dest: Final[Path] = Path.path(path)
122 dest.enforce_dir()
124 logger(
125 f"checking commit information of repo '{dest}' via '{TOOL_GIT}'.")
126 stdout: str = enforce_non_empty_str(shell(
127 [TOOL_GIT, "-C", dest, "log", "--no-abbrev-commit", "-1"],
128 timeout=120, cwd=dest, wants_stdout=True))
130 match = re.search("^\\s*commit\\s+(.+?)\\s+", stdout,
131 flags=re.MULTILINE)
132 if match is None:
133 raise ValueError(
134 f"Did not find commit information in repo '{dest}'.")
135 commit: Final[str] = enforce_non_empty_str_without_ws(match.group(1))
136 match = re.search("^\\s*Date:\\s+(.+?)$", stdout, flags=re.MULTILINE)
137 if match is None:
138 raise ValueError(
139 f"Did not find date information in repo '{dest}'.")
140 date_str: Final[str] = enforce_non_empty_str(match.group(1))
141 date_raw: Final[datetime.datetime] = datetime.datetime.strptime(
142 date_str, "%a %b %d %H:%M:%S %Y %z")
143 if not isinstance(date_raw, datetime.datetime):
144 raise type_error(date_raw, "date_raw", datetime.datetime)
145 date_time: Final[str] = datetime_to_datetime_str(date_raw)
146 logger(f"found commit '{commit}' and date/time '{date_time}' "
147 f"for repo '{dest}'.")
149 if url is None:
150 logger(f"applying '{TOOL_GIT}' to get url information.")
151 url = enforce_non_empty_str(shell(
152 [TOOL_GIT, "-C", dest, "config", "--get", "remote.origin.url"],
153 timeout=120, cwd=dest, wants_stdout=True))
154 url = enforce_non_empty_str_without_ws(
155 url.strip().split("\n")[0].strip())
156 if url.endswith("/.git"):
157 url = enforce_non_empty_str_without_ws(f"{url[:-5]}.git")
158 if url.endswith("/"):
159 url = enforce_non_empty_str_without_ws(url[:-1])
160 logger(f"found url '{url}' for repo '{dest}'.")
161 if url.startswith("ssh://git@github.com"):
162 url = f"https://{url[10:]}"
164 return Repo(dest, url, commit, date_time)
166 def get_base_url(self) -> str:
167 """
168 Get the base url of this repository.
170 :return: the base url of this repository
171 """
172 base_url = self.url
173 base_url_lower = base_url.lower()
174 if base_url_lower.startswith("ssh://git@github."):
175 base_url = f"https://{enforce_non_empty_str(base_url[10:])}"
176 if base_url_lower.endswith(".git"):
177 base_url = enforce_non_empty_str(base_url[:-4])
178 return enforce_url(base_url)
180 def make_url(self, relative_path: str) -> str:
181 """
182 Make an url relative to this repository.
184 :param relative_path: the relative path
185 :return: the url
186 """
187 pt: Final[Path] = self.path.resolve_inside(relative_path)
188 pt.ensure_file_exists()
189 path: Final[str] = pt.relative_to(self.path)
191 base_url = self.get_base_url()
193 if "github.com" in base_url.lower():
194 base_url = f"{base_url}/blob/{self.commit}/{path}"
195 else:
196 base_url = f"{base_url}/{path}"
197 return enforce_url(base_url)
199 def get_name(self) -> str:
200 """
201 Get the name of this repository in the form 'user/name'.
203 :return: the name of this repository in the form 'user/name'.
204 """
205 base_url = self.url
206 if base_url.lower().endswith(".git"):
207 base_url = enforce_non_empty_str_without_ws(base_url[:-4])
208 si = base_url.rfind("/")
209 if si <= 0:
210 return base_url
211 si = max(0, base_url.rfind("/", 0, si - 1))
212 return enforce_non_empty_str(base_url[si + 1:].strip())