Coverage for bookbuilderpy/git.py: 66%

115 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-17 23:15 +0000

1"""Tools for interacting with git.""" 

2import datetime 

3import re 

4from dataclasses import dataclass 

5from shutil import rmtree 

6from subprocess import TimeoutExpired # nosec 

7from typing import Final 

8 

9from bookbuilderpy.logger import logger 

10from bookbuilderpy.path import Path 

11from bookbuilderpy.shell import shell 

12from bookbuilderpy.strings import ( 

13 datetime_to_datetime_str, 

14 enforce_non_empty_str, 

15 enforce_non_empty_str_without_ws, 

16 enforce_url, 

17) 

18from bookbuilderpy.types import type_error 

19from bookbuilderpy.versions import TOOL_GIT, has_tool 

20 

21 

22@dataclass(frozen=True, init=False, order=True) 

23class Repo: 

24 """An immutable record of a git repository.""" 

25 

26 #: the repository path 

27 path: Path 

28 #: the repository url 

29 url: str 

30 #: the commit 

31 commit: str 

32 #: the date and time 

33 date_time: str 

34 

35 def __init__(self, 

36 path: Path, 

37 url: str, 

38 commit: str, 

39 date_time: str): 

40 """ 

41 Set up the information about a repository. 

42 

43 :param path: the path 

44 :param url: the url 

45 :param commit: the commit 

46 :param date_time: the date and time 

47 """ 

48 if not isinstance(path, Path): 

49 raise type_error(path, "path", Path) 

50 path.enforce_dir() 

51 object.__setattr__(self, "path", path) 

52 object.__setattr__(self, "url", enforce_url(url)) 

53 object.__setattr__(self, "commit", 

54 enforce_non_empty_str_without_ws(commit)) 

55 if len(self.commit) != 40: 

56 raise ValueError(f"Invalid commit: '{self.commit}'.") 

57 try: 

58 int(self.commit, 16) 

59 except ValueError as e: 

60 raise ValueError("Invalid commit information " 

61 f"'{self.commit}' for repo '{url}'.") from e 

62 object.__setattr__(self, "date_time", 

63 enforce_non_empty_str(date_time)) 

64 logger(f"found repository in path '{self.path}' with commit " 

65 f"'{self.commit}' for url '{self.url}' and " 

66 f"date '{self.date_time}'.") 

67 

68 @staticmethod 

69 def download(url: str, 

70 dest_dir: str) -> "Repo": 

71 """ 

72 Download a git repository. 

73 

74 :param url: the repository url 

75 :param dest_dir: the destination directory 

76 :return: the repository information 

77 """ 

78 if not has_tool(TOOL_GIT): 

79 raise ValueError(f"No '{TOOL_GIT}' installation found.") 

80 

81 dest: Final[Path] = Path.path(dest_dir) 

82 dest.ensure_dir_exists() 

83 url = enforce_url(url) 

84 s = f" repository '{url}' to directory '{dest}'" 

85 logger(f"starting to load{s} via '{TOOL_GIT}'.") 

86 try: 

87 shell([TOOL_GIT, "-C", dest, "clone", 

88 "--depth", "1", url, dest], timeout=300, 

89 cwd=dest) 

90 except TimeoutExpired: 

91 if url.startswith("https://github.com"): 

92 url2 = enforce_url(f"ssh://git@{url[8:]}") 

93 logger(f"timeout when loading url '{url}', so we try " 

94 f"'{url2}' instead, but first delete '{dest}'.") 

95 rmtree(dest, ignore_errors=True, onerror=None) 

96 dest.ensure_dir_exists() 

97 logger(f"'{dest}' deleted and created, now re-trying cloning.") 

98 shell([TOOL_GIT, "-C", dest, "clone", 

99 "--depth", "1", url2, dest], timeout=300, 

100 cwd=dest) 

101 else: 

102 logger(f"timeout when loading url '{url}'.") 

103 raise 

104 logger(f"successfully finished loading{s}.") 

105 

106 return Repo.from_local(path=dest, url=url) 

107 

108 @staticmethod 

109 def from_local(path: str, 

110 url: str | None = None) -> "Repo": 

111 """ 

112 Load all the information from an local repository. 

113 

114 :param path: the path to the repository 

115 :param url: the url 

116 :return: the repository information 

117 """ 

118 if not has_tool(TOOL_GIT): 

119 raise ValueError(f"No '{TOOL_GIT}' installation found.") 

120 

121 dest: Final[Path] = Path.path(path) 

122 dest.enforce_dir() 

123 

124 logger( 

125 f"checking commit information of repo '{dest}' via '{TOOL_GIT}'.") 

126 stdout: str = enforce_non_empty_str(shell( 

127 [TOOL_GIT, "-C", dest, "log", "--no-abbrev-commit", "-1"], 

128 timeout=120, cwd=dest, wants_stdout=True)) 

129 

130 match = re.search("^\\s*commit\\s+(.+?)\\s+", stdout, 

131 flags=re.MULTILINE) 

132 if match is None: 

133 raise ValueError( 

134 f"Did not find commit information in repo '{dest}'.") 

135 commit: Final[str] = enforce_non_empty_str_without_ws(match.group(1)) 

136 match = re.search("^\\s*Date:\\s+(.+?)$", stdout, flags=re.MULTILINE) 

137 if match is None: 

138 raise ValueError( 

139 f"Did not find date information in repo '{dest}'.") 

140 date_str: Final[str] = enforce_non_empty_str(match.group(1)) 

141 date_raw: Final[datetime.datetime] = datetime.datetime.strptime( 

142 date_str, "%a %b %d %H:%M:%S %Y %z") 

143 if not isinstance(date_raw, datetime.datetime): 

144 raise type_error(date_raw, "date_raw", datetime.datetime) 

145 date_time: Final[str] = datetime_to_datetime_str(date_raw) 

146 logger(f"found commit '{commit}' and date/time '{date_time}' " 

147 f"for repo '{dest}'.") 

148 

149 if url is None: 

150 logger(f"applying '{TOOL_GIT}' to get url information.") 

151 url = enforce_non_empty_str(shell( 

152 [TOOL_GIT, "-C", dest, "config", "--get", "remote.origin.url"], 

153 timeout=120, cwd=dest, wants_stdout=True)) 

154 url = enforce_non_empty_str_without_ws( 

155 url.strip().split("\n")[0].strip()) 

156 if url.endswith("/.git"): 

157 url = enforce_non_empty_str_without_ws(f"{url[:-5]}.git") 

158 if url.endswith("/"): 

159 url = enforce_non_empty_str_without_ws(url[:-1]) 

160 logger(f"found url '{url}' for repo '{dest}'.") 

161 if url.startswith("ssh://git@github.com"): 

162 url = f"https://{url[10:]}" 

163 

164 return Repo(dest, url, commit, date_time) 

165 

166 def get_base_url(self) -> str: 

167 """ 

168 Get the base url of this repository. 

169 

170 :return: the base url of this repository 

171 """ 

172 base_url = self.url 

173 base_url_lower = base_url.lower() 

174 if base_url_lower.startswith("ssh://git@github."): 

175 base_url = f"https://{enforce_non_empty_str(base_url[10:])}" 

176 if base_url_lower.endswith(".git"): 

177 base_url = enforce_non_empty_str(base_url[:-4]) 

178 return enforce_url(base_url) 

179 

180 def make_url(self, relative_path: str) -> str: 

181 """ 

182 Make an url relative to this repository. 

183 

184 :param relative_path: the relative path 

185 :return: the url 

186 """ 

187 pt: Final[Path] = self.path.resolve_inside(relative_path) 

188 pt.ensure_file_exists() 

189 path: Final[str] = pt.relative_to(self.path) 

190 

191 base_url = self.get_base_url() 

192 

193 if "github.com" in base_url.lower(): 

194 base_url = f"{base_url}/blob/{self.commit}/{path}" 

195 else: 

196 base_url = f"{base_url}/{path}" 

197 return enforce_url(base_url) 

198 

199 def get_name(self) -> str: 

200 """ 

201 Get the name of this repository in the form 'user/name'. 

202 

203 :return: the name of this repository in the form 'user/name'. 

204 """ 

205 base_url = self.url 

206 if base_url.lower().endswith(".git"): 

207 base_url = enforce_non_empty_str_without_ws(base_url[:-4]) 

208 si = base_url.rfind("/") 

209 if si <= 0: 

210 return base_url 

211 si = max(0, base_url.rfind("/", 0, si - 1)) 

212 return enforce_non_empty_str(base_url[si + 1:].strip())