Coverage for bookbuilderpy/path.py: 86%

231 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-17 23:15 +0000

1"""The base class with the information of a build.""" 

2 

3import codecs 

4import gzip 

5import io 

6import os.path 

7import shutil 

8from re import MULTILINE 

9from re import compile as _compile 

10from typing import Final, Iterable, Pattern, cast 

11 

12from bookbuilderpy.strings import enforce_non_empty_str_without_ws, regex_sub 

13from bookbuilderpy.types import type_error 

14 

15 

16def _canonicalize_path(path: str) -> str: 

17 """ 

18 Check and canonicalize a path. 

19 

20 :param path: the path 

21 :return: the canonicalized path 

22 """ 

23 if not isinstance(path, str): 

24 raise type_error(path, "path", str) 

25 if len(path) <= 0: 

26 raise ValueError("Path must not be empty.") 

27 

28 path = os.path.normcase( 

29 os.path.abspath( 

30 os.path.realpath( 

31 os.path.expanduser( 

32 os.path.expandvars(path))))) 

33 if not isinstance(path, str): 

34 raise type_error(path, "canonicalized path", str) 

35 if len(path) <= 0: 

36 raise ValueError("Canonicalization must yield non-empty string, " 

37 f"but returned '{path}'.") 

38 if path in [".", ".."]: 

39 raise ValueError(f"Canonicalization cannot yield '{path}'.") 

40 return path 

41 

42 

43def copy_pure(path_in: str, path_out: str) -> "Path": 

44 """ 

45 Perform the internal method to copy a file. 

46 

47 :param path_in: the path to the input file 

48 :param path_out: the path to the output file 

49 :returns: the path to the new file 

50 """ 

51 return Path.file(str(shutil.copyfile(path_in, path_out))) 

52 

53 

54def move_pure(path_in: str, path_out: str) -> "Path": 

55 """ 

56 Copy a file. 

57 

58 :param path_in: the path to the input file 

59 :param path_out: the path to the output file 

60 :returns: the path to the new file 

61 """ 

62 po = Path.path(path_out) 

63 shutil.move(path_in, po) 

64 po.enforce_file() 

65 return po 

66 

67 

68def _copy_un_gzip(path_in: str, path_out: str) -> "Path": 

69 """ 

70 Copy a gzip-compressed file. 

71 

72 :param path_in: the path to the input file 

73 :param path_out: the path to the output file 

74 :returns: the path to the new file 

75 """ 

76 po = Path.path(path_out) 

77 with gzip.open(path_in, "rb") as f_in, open(po, "wb") as f_out: 

78 shutil.copyfileobj(f_in, f_out) 

79 po.enforce_file() 

80 return po 

81 

82 

83#: a pattern used to clean up training white space 

84_PATTERN_TRAILING_WHITESPACE: Final[Pattern] = \ 

85 _compile(r"[ \t]+\n", MULTILINE) 

86 

87 

88#: the UTF-8 encoding 

89UTF8: Final[str] = "utf-8-sig" 

90 

91#: The list of possible text encodings 

92__ENCODINGS: Final[tuple[tuple[tuple[bytes, ...], str], ...]] = \ 

93 (((codecs.BOM_UTF8,), UTF8), 

94 ((codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE), "utf-32"), 

95 ((codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE), "utf-16")) 

96 

97 

98def _get_text_encoding(filename: str) -> str: 

99 """ 

100 Get the text encoding from a BOM if present. 

101 

102 Adapted from https://stackoverflow.com/questions/13590749. 

103 

104 :param filename: the filename 

105 :return: the encoding 

106 """ 

107 with open(filename, "rb") as f: 

108 header = f.read(4) # Read just the first four bytes. 

109 for boms, encoding in __ENCODINGS: 

110 for bom in boms: 

111 if header.find(bom) == 0: 

112 return encoding 

113 return UTF8 

114 

115 

116class Path(str): 

117 """An immutable representation of a path.""" 

118 

119 #: the common path version of this path, if any 

120 __common: str | None 

121 #: the internal state: 0=don't know, 1=file, 2=dir 

122 __state: int 

123 

124 def __new__(cls, value): 

125 """ 

126 Construct the object. 

127 

128 :param value: the string value 

129 """ 

130 ret = super().__new__(cls, _canonicalize_path(value)) 

131 ret.__common = None 

132 ret.__state = 0 

133 return ret 

134 

135 def enforce_file(self) -> None: 

136 """ 

137 Enforce that a path references an existing file. 

138 

139 :raises ValueError: if `path` does not reference an existing file 

140 """ 

141 if self.__state == 0 and os.path.isfile(self): 

142 self.__state = 1 

143 if self.__state != 1: 

144 raise ValueError(f"Path '{self}' does not identify a file.") 

145 

146 def enforce_dir(self) -> None: 

147 """ 

148 Enforce that a path references an existing directory. 

149 

150 :raises ValueError: if `path` does not reference an existing directory 

151 """ 

152 if self.__state == 0 and os.path.isdir(self): 

153 self.__state = 2 

154 if self.__state != 2: 

155 raise ValueError(f"Path '{self}' does not identify a directory.") 

156 

157 def contains(self, other: str) -> bool: 

158 """ 

159 Check whether another path is contained in this path. 

160 

161 :param other: the other path 

162 :return: `True` is this path contains the other path, `False` if not 

163 """ 

164 if self == other: 

165 return True 

166 if not os.path.isdir(self): 

167 return False 

168 if self.__common is None: 

169 self.__common = os.path.commonpath([self]) 

170 return self.__common == os.path.commonpath([self, Path.path(other)]) 

171 

172 def enforce_contains(self, other: str) -> None: 

173 """ 

174 Raise an exception if this path does not contain the other path. 

175 

176 :param other: the other path 

177 :raises ValueError: if `other` is not a sub-path of this path 

178 """ 

179 self.enforce_dir() 

180 if not self.contains(other): 

181 raise ValueError(f"Path '{self}' does not contain '{other}'.") 

182 

183 def enforce_neither_contains(self, other: str) -> None: 

184 """ 

185 Enforce that neither path contains another one. 

186 

187 :param other: the other path 

188 :raises ValueError: if `other` is contained in this path or vice versa 

189 """ 

190 if self.__common is None: 

191 self.__common = os.path.commonpath([self]) 

192 opath: Final[Path] = Path.path(other) 

193 joint: Final[str] = os.path.commonpath([self, opath]) 

194 if joint == self.__common: 

195 raise ValueError(f"Path '{self}' contains '{opath}'.") 

196 if opath.__common is None: 

197 opath.__common = os.path.commonpath([opath]) 

198 if joint == opath.__common: 

199 raise ValueError(f"Path '{opath}' contains '{self}'.") 

200 

201 def relative_to(self, base_path: str) -> str: 

202 """ 

203 Compute a relative path of this path towards the given base path. 

204 

205 :param base_path: the string 

206 :return: a relative path 

207 :raises ValueError: if this path is not inside `base_path` 

208 """ 

209 opath: Final[Path] = Path.path(base_path) 

210 opath.enforce_contains(self) 

211 return enforce_non_empty_str_without_ws( 

212 os.path.relpath(self, opath)) 

213 

214 def resolve_inside(self, relative_path: str) -> "Path": 

215 """ 

216 Resolve a relative path to an absolute path inside this path. 

217 

218 :param relative_path: the path to resolve 

219 :return: the resolved child path 

220 :raises ValueError: If the path would resolve to something outside of 

221 this path and/or if it is empty. 

222 """ 

223 opath: Final[Path] = Path.path(os.path.join( 

224 self, enforce_non_empty_str_without_ws(relative_path))) 

225 self.enforce_contains(opath) 

226 return opath 

227 

228 def ensure_file_exists(self) -> bool: 

229 """ 

230 Atomically ensure that the file exists and create it otherwise. 

231 

232 :return: `True` if the file already existed and 

233 `False` if it was newly and atomically created. 

234 :raises: ValueError if anything goes wrong during the file creation 

235 """ 

236 existed: bool = False 

237 try: 

238 os.close(os.open(self, os.O_CREAT | os.O_EXCL)) 

239 except FileExistsError: 

240 existed = True 

241 except Exception as err: 

242 if isinstance(err, ValueError): 

243 raise 

244 raise ValueError( 

245 f"Error when trying to create file '{self}'.") from err 

246 self.enforce_file() 

247 return existed 

248 

249 def ensure_dir_exists(self) -> None: 

250 """Make sure that the directory exists, create it otherwise.""" 

251 try: 

252 os.makedirs(name=self, exist_ok=True) 

253 except FileExistsError: 

254 pass 

255 except Exception as err: 

256 if isinstance(err, ValueError): 

257 raise 

258 raise ValueError( 

259 f"Error when trying to create directory '{self}'.") from err 

260 self.enforce_dir() 

261 

262 def __open_for_read(self) -> io.TextIOWrapper: 

263 """ 

264 Open this file for reading. 

265 

266 :return: the file open for reading 

267 """ 

268 return cast(io.TextIOWrapper, open( # noqa 

269 self, encoding=_get_text_encoding(self), # noqa 

270 errors="strict")) # noqa 

271 

272 def read_all_list(self) -> list[str]: 

273 """ 

274 Read all the lines in a file. 

275 

276 :return: the list of strings of text 

277 """ 

278 self.enforce_file() 

279 with self.__open_for_read() as reader: 

280 ret = reader.readlines() 

281 if not isinstance(ret, list): 

282 raise type_error(ret, "ret", list) 

283 if len(ret) <= 0: 

284 raise ValueError(f"File '{self}' contains no text.") 

285 return ret 

286 

287 def read_all_str(self) -> str: 

288 """ 

289 Read a file as a single string. 

290 

291 :return: the single string of text 

292 """ 

293 self.enforce_file() 

294 with self.__open_for_read() as reader: 

295 ret = reader.read() 

296 if not isinstance(ret, str): 

297 raise type_error(ret, "ret", str) 

298 if len(ret) <= 0: 

299 raise ValueError(f"File '{self}' contains no text.") 

300 return ret 

301 

302 def __open_for_write(self) -> io.TextIOWrapper: 

303 """ 

304 Open the file for writing. 

305 

306 :return: the text io wrapper for writing 

307 """ 

308 return cast(io.TextIOWrapper, open( # noqa 

309 self, mode="w", encoding="utf-8", errors="strict")) # noqa 

310 

311 def write_all(self, contents: str | Iterable[str]) -> None: 

312 """ 

313 Read all the lines in a file. 

314 

315 :param contents: the contents to write 

316 """ 

317 self.ensure_file_exists() 

318 if not isinstance(contents, str | Iterable): 

319 raise type_error(contents, "contents", (str, Iterable)) 

320 with self.__open_for_write() as writer: 

321 all_text = contents if isinstance(contents, str) \ 

322 else "\n".join(contents) 

323 if len(all_text) <= 0: 

324 raise ValueError("Writing empty text is not permitted.") 

325 all_text = regex_sub(_PATTERN_TRAILING_WHITESPACE, 

326 "\n", all_text.rstrip()) 

327 if len(all_text) <= 0: 

328 raise ValueError( 

329 "Text becomes empty after removing trailing whitespace?") 

330 writer.write(all_text) 

331 if all_text[-1] != "\n": 

332 writer.write("\n") 

333 

334 def as_directory(self) -> "Path": 

335 """ 

336 Return the closest directory along this path. 

337 

338 :return: the directory: either this path if it already identifies a 

339 directory, or the parent directory if this path identifies a file. 

340 :raises ValueError: if no containing directory exists 

341 """ 

342 if os.path.isfile(self): 

343 base_dir = Path.path(os.path.dirname(self)) 

344 else: 

345 base_dir = self 

346 base_dir.enforce_dir() 

347 return base_dir 

348 

349 def resolve_input_file(self, 

350 relative_path: str, 

351 lang: str | None = None) -> "Path": 

352 """ 

353 Resolve a path to an input file relative to this path. 

354 

355 :param relative_path: the relative path to resolve 

356 :param lang: the language to use 

357 :return: the resolved path 

358 :raises ValueError: if the path cannot be resolved to a file 

359 """ 

360 relative_path = enforce_non_empty_str_without_ws(relative_path) 

361 lang = None if lang is None \ 

362 else enforce_non_empty_str_without_ws(lang) 

363 

364 base_dir: Final[Path] = self.as_directory() 

365 candidate: Path 

366 

367 if lang is not None: 

368 prefix, suffix = Path.split_prefix_suffix(relative_path) 

369 candidate = base_dir.resolve_inside(f"{prefix}_{lang}.{suffix}") 

370 if os.path.isfile(candidate): 

371 candidate.__state = 1 

372 return candidate 

373 candidate = base_dir.resolve_inside(relative_path) 

374 candidate.enforce_file() 

375 return candidate 

376 

377 @staticmethod 

378 def split_prefix_suffix(name: str, 

379 enforce_suffix: bool = True) -> tuple[str, str]: 

380 """ 

381 Split the file name 'name' into a prefix and a suffix. 

382 

383 :param name: the file name 

384 :param enforce_suffix: crash if no suffix? 

385 :return: a tuple of [prefix, suffix] 

386 """ 

387 dot: int = name.rfind(".") 

388 if (dot < 0) or (dot >= (len(name) - 1)): 

389 if enforce_suffix: 

390 raise ValueError(f"'{name}' does not have suffix?") 

391 return enforce_non_empty_str_without_ws(name), "" 

392 

393 # check for stuff such as tar.xz and tar.gz 

394 dot2: Final[int] = name.rfind(".", 0, dot - 1) 

395 if 0 < dot2 < dot and name[dot2 + 1:dot] == "tar": 

396 dot = dot2 

397 return enforce_non_empty_str_without_ws(name[:dot]), \ 

398 enforce_non_empty_str_without_ws(name[dot + 1:]) 

399 

400 @staticmethod 

401 def path(path: str) -> "Path": 

402 """ 

403 Get a canonical path. 

404 

405 :param path: the path to canonicalize 

406 :return: the `Path` instance 

407 """ 

408 if isinstance(path, Path): 

409 return cast(Path, path) 

410 return Path(path) 

411 

412 @staticmethod 

413 def file(path: str) -> "Path": 

414 """ 

415 Get a path identifying a file. 

416 

417 :param path: the path 

418 :return: the file 

419 """ 

420 fi: Final[Path] = Path.path(path) 

421 fi.enforce_file() 

422 return fi 

423 

424 @staticmethod 

425 def directory(path: str) -> "Path": 

426 """ 

427 Get a path identifying a directory. 

428 

429 :param path: the path 

430 :return: the file 

431 """ 

432 fi: Final[Path] = Path.path(path) 

433 fi.enforce_dir() 

434 return fi 

435 

436 @staticmethod 

437 def copy_file(source: str, 

438 dest: str) -> "Path": 

439 """ 

440 Copy one file to another one, doing gz-unzipping if necessary. 

441 

442 This method copies a source file to a destination file. 

443 If the source file has suffix "svgz" and the destination file has 

444 suffix "svg" OR if the source file has suffix "gz" and the destination 

445 file has not, then we will unzip the source file to the destination 

446 file. 

447 Otherwise, a normal copy is performed. 

448 

449 :param source: the source file 

450 :param dest: the destination file 

451 :return: the fully-qualified destination path 

452 """ 

453 source_file = Path.file(source) 

454 dest_file = Path.path(dest) 

455 if source_file == dest_file: 

456 raise ValueError(f"Cannot copy file '{dest_file}' into itself.") 

457 _, ssuffix = Path.split_prefix_suffix(source_file, False) 

458 _, dsuffix = Path.split_prefix_suffix(dest_file, False) 

459 if ((ssuffix == "svgz") and (dsuffix == "svg")) or \ 

460 ((ssuffix == "gz") and (dsuffix != "gz")): 

461 copy = _copy_un_gzip 

462 else: 

463 copy = copy_pure 

464 copy(source_file, dest_file) 

465 dest_file.enforce_file() 

466 return dest_file 

467 

468 @staticmethod 

469 def copy_resource(source_dir: str, 

470 input_file: str, 

471 dest_dir: str) -> "Path": 

472 """ 

473 Copy an input file to an destination directory. 

474 

475 :param source_dir: the source directory 

476 :param input_file: the input file 

477 :param dest_dir: the destination directory 

478 :return: the path 

479 """ 

480 in_dir = Path.path(source_dir) 

481 in_dir.enforce_dir() 

482 in_file = Path.path(input_file) 

483 in_file.enforce_file() 

484 out_dir = Path.path(dest_dir) 

485 out_dir.enforce_dir() 

486 in_dir.enforce_neither_contains(out_dir) 

487 

488 rel_path = in_file.relative_to(in_dir) 

489 prefix, suffix = Path.split_prefix_suffix(rel_path) 

490 if suffix == "svgz": 

491 rel_path = f"{prefix}.svg" 

492 

493 out_path = out_dir.resolve_inside(rel_path) 

494 inner_dir = Path.path(os.path.dirname(out_path)) 

495 out_dir.enforce_contains(inner_dir) 

496 inner_dir.ensure_dir_exists() 

497 return Path.copy_file(in_file, out_path)