Coverage for texgit / repository / git.py: 75%
125 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-22 02:50 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-22 02:50 +0000
1"""Tools for interacting with repository."""
2import datetime
3from dataclasses import dataclass
4from re import MULTILINE, Pattern, search
5from re import compile as re_compile
6from shutil import rmtree, which
7from typing import Final, cast
9from pycommons.io.console import logger
10from pycommons.io.path import Path, file_path
11from pycommons.net.url import URL
12from pycommons.processes.shell import STREAM_CAPTURE, Command
13from pycommons.strings.enforce import (
14 enforce_non_empty_str,
15 enforce_non_empty_str_without_ws,
16)
17from pycommons.strings.string_conv import datetime_to_datetime_str
18from pycommons.types import type_error
21def git() -> Path:
22 """
23 Get the path to the git executable.
25 :return: the path to the git executable
26 """
27 obj: Final[object] = git
28 attr: Final[str] = "__the_path"
29 if hasattr(obj, attr):
30 return cast("Path", getattr(obj, attr))
32 path: str | None = which("git")
33 if path is None:
34 raise ValueError("Could not find 'repository' installation.")
35 result: Final[Path] = file_path(path)
36 setattr(obj, attr, result)
37 return result
40#: the commit pattern
41_COMMIT: Final[Pattern] = re_compile(r"^\s*commit\s+(.+?)\s+", flags=MULTILINE)
42#: the date
43_DATE: Final[Pattern] = re_compile(r"^\s*Date:\s+(.+?)$", flags=MULTILINE)
46def _get_base_url(url: str) -> URL:
47 """
48 Get the base url of a git repository.
50 :return: the base url of a git repository.
51 """
52 base_url: str = URL(url)
53 base_url_lower: str = str.lower(base_url)
54 if base_url_lower.startswith("ssh://git@github."):
55 base_url = f"https://{enforce_non_empty_str(base_url[10:])}"
56 if base_url_lower.endswith(".git"):
57 base_url = enforce_non_empty_str(base_url[:-4])
58 return URL(base_url)
61@dataclass(frozen=True, init=False, order=True)
62class GitRepository:
63 """An immutable record of a repository."""
65 #: the repository path
66 path: Path
67 #: the normalized repository url
68 url: URL
69 #: the commit
70 commit: str
71 #: the date and time
72 date_time: str
74 def __init__(self, path: Path, url: str, commit: str, date_time: str):
75 """
76 Set up the information about a repository.
78 :param path: the path
79 :param url: the url
80 :param commit: the commit
81 :param date_time: the date and time
82 """
83 if not isinstance(path, Path):
84 raise type_error(path, "path", Path)
85 path.enforce_dir()
86 object.__setattr__(self, "path", path)
87 object.__setattr__(self, "url", _get_base_url(url))
88 object.__setattr__(self, "commit",
89 enforce_non_empty_str_without_ws(commit))
90 if len(self.commit) != 40:
91 raise ValueError(f"Invalid commit: {self.commit!r}.")
92 try:
93 int(self.commit, 16)
94 except ValueError as e:
95 raise ValueError("Invalid commit information "
96 f"{self.commit!r} for repo {url!r}.") from e
97 object.__setattr__(self, "date_time",
98 enforce_non_empty_str(date_time))
99 logger(f"found repository in path {self.path!r} with commit "
100 f"{self.commit!r} for url {self.url!r} and "
101 f"date {self.date_time!r}.")
103 @staticmethod
104 def download(url: str, dest_dir: str) -> "GitRepository":
105 """
106 Download a git repository.
108 :param url: the repository url
109 :param dest_dir: the destination directory
110 :return: the repository information
111 """
112 dest: Final[Path] = Path(dest_dir)
113 gt: Final[Path] = git()
114 dest.ensure_dir_exists()
115 url = URL(url)
116 s = f" repository {url!r} to directory {dest!r}"
117 logger(f"starting to load{s} via {gt!r}.")
118 try:
119 Command([
120 gt, "-C", dest, "clone", "--depth", "1", url, dest],
121 timeout=600, working_dir=dest).execute(True)
122 except ValueError:
123 if not url.startswith("https://github.com"):
124 raise
125 url2 = URL(f"ssh://git@{url[8:]}")
126 logger(f"timeout when loading url {url!r}, so we try "
127 f"{url2!r} instead, but first delete {dest!r}.")
128 rmtree(dest, ignore_errors=True)
129 dest.ensure_dir_exists()
130 logger(f"{dest!r} deleted and created, now re-trying cloning.")
131 Command([
132 gt, "-C", dest, "clone", "--depth", "1", url2, dest],
133 timeout=600, working_dir=dest).execute(True)
134 logger(f"successfully finished loading{s}.")
136 return GitRepository.from_local(path=dest, url=url)
138 @staticmethod
139 def from_local(path: str, url: str | None = None) -> "GitRepository":
140 """
141 Load all the information from a local repository.
143 :param path: the path to the repository
144 :param url: the url
145 :return: the repository information
146 """
147 dest: Final[Path] = Path(path)
148 gt: Final[str] = git()
149 dest.enforce_dir()
151 logger(
152 f"checking commit information of repo {dest!r} via {gt!r}.")
153 stdout: str = enforce_non_empty_str(Command(
154 [gt, "-C", dest, "log", "--no-abbrev-commit", "-1"],
155 timeout=120, working_dir=dest, stdout=STREAM_CAPTURE).execute(
156 True)[0])
158 match = search(_COMMIT, stdout)
159 if match is None:
160 raise ValueError(
161 f"Did not find commit information in repo {dest!r}.")
162 commit: Final[str] = enforce_non_empty_str_without_ws(match.group(1))
163 match = search(_DATE, stdout)
164 if match is None:
165 raise ValueError(
166 f"Did not find date information in repo {dest!r}.")
167 date_str: Final[str] = enforce_non_empty_str(match.group(1))
168 date_raw: Final[datetime.datetime] = datetime.datetime.strptime(
169 date_str, "%a %b %d %H:%M:%S %Y %z")
170 if not isinstance(date_raw, datetime.datetime):
171 raise type_error(date_raw, "date_raw", datetime.datetime)
172 date_time: Final[str] = datetime_to_datetime_str(date_raw)
173 logger(f"found commit {commit!r} and date/time {date_time!r} "
174 f"for repo {dest!r}.")
176 if url is None:
177 logger(f"applying {gt!r} to get url information.")
178 url = enforce_non_empty_str(Command(
179 [gt, "-C", dest, "config", "--get", "remote.origin.url"],
180 timeout=120, working_dir=dest, stdout=STREAM_CAPTURE)
181 .execute(True)[0])
182 url = enforce_non_empty_str_without_ws(
183 url.strip().split("\n")[0].strip())
184 if url.endswith("/.git"):
185 url = enforce_non_empty_str_without_ws(f"{url[:-5]}.git")
186 if url.endswith("/"):
187 url = enforce_non_empty_str_without_ws(url[:-1])
188 logger(f"found url {url!r} for repo {dest!r}.")
189 if url.startswith("ssh://git@github.com"):
190 url = f"https://{url[10:]}"
192 return GitRepository(dest, url, commit, date_time)
194 def make_url(self, path: str) -> URL:
195 """
196 Make a url relative to this git repository.
198 :param path: the absolute path
199 :return: the url
200 """
201 pt: Final[Path] = Path(path)
202 self.path.enforce_contains(pt)
203 if not (pt.exists() and (pt.is_file() or pt.is_dir())):
204 raise ValueError(
205 f"Path {path!r} does not exist in {self}.")
207 relative_path: Final[str] = pt.relative_to(self.path)
208 url: Final[URL] = self.url
209 return URL(f"{url}/blob/{self.commit}/{relative_path}"
210 if url.host == "github.com" else f"{url}/{relative_path}")
212 def get_name(self) -> str:
213 """
214 Get the name of this git repository in the form 'user/name'.
216 :return: the name of this git repository in the form 'user/name'.
217 """
218 base_url: str = self.url
219 if base_url.lower().endswith(".git"):
220 base_url = enforce_non_empty_str_without_ws(base_url[:-4])
221 si: int = base_url.rfind("/")
222 if si <= 0:
223 return base_url
224 si = max(0, base_url.rfind("/", 0, si - 1))
225 return enforce_non_empty_str(base_url[si + 1:].strip())