Coverage for bookbuilderpy/path.py: 86%
231 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-17 23:15 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-17 23:15 +0000
1"""The base class with the information of a build."""
3import codecs
4import gzip
5import io
6import os.path
7import shutil
8from re import MULTILINE
9from re import compile as _compile
10from typing import Final, Iterable, Pattern, cast
12from bookbuilderpy.strings import enforce_non_empty_str_without_ws, regex_sub
13from bookbuilderpy.types import type_error
16def _canonicalize_path(path: str) -> str:
17 """
18 Check and canonicalize a path.
20 :param path: the path
21 :return: the canonicalized path
22 """
23 if not isinstance(path, str):
24 raise type_error(path, "path", str)
25 if len(path) <= 0:
26 raise ValueError("Path must not be empty.")
28 path = os.path.normcase(
29 os.path.abspath(
30 os.path.realpath(
31 os.path.expanduser(
32 os.path.expandvars(path)))))
33 if not isinstance(path, str):
34 raise type_error(path, "canonicalized path", str)
35 if len(path) <= 0:
36 raise ValueError("Canonicalization must yield non-empty string, "
37 f"but returned '{path}'.")
38 if path in [".", ".."]:
39 raise ValueError(f"Canonicalization cannot yield '{path}'.")
40 return path
43def copy_pure(path_in: str, path_out: str) -> "Path":
44 """
45 Perform the internal method to copy a file.
47 :param path_in: the path to the input file
48 :param path_out: the path to the output file
49 :returns: the path to the new file
50 """
51 return Path.file(str(shutil.copyfile(path_in, path_out)))
54def move_pure(path_in: str, path_out: str) -> "Path":
55 """
56 Copy a file.
58 :param path_in: the path to the input file
59 :param path_out: the path to the output file
60 :returns: the path to the new file
61 """
62 po = Path.path(path_out)
63 shutil.move(path_in, po)
64 po.enforce_file()
65 return po
68def _copy_un_gzip(path_in: str, path_out: str) -> "Path":
69 """
70 Copy a gzip-compressed file.
72 :param path_in: the path to the input file
73 :param path_out: the path to the output file
74 :returns: the path to the new file
75 """
76 po = Path.path(path_out)
77 with gzip.open(path_in, "rb") as f_in, open(po, "wb") as f_out:
78 shutil.copyfileobj(f_in, f_out)
79 po.enforce_file()
80 return po
83#: a pattern used to clean up training white space
84_PATTERN_TRAILING_WHITESPACE: Final[Pattern] = \
85 _compile(r"[ \t]+\n", MULTILINE)
88#: the UTF-8 encoding
89UTF8: Final[str] = "utf-8-sig"
91#: The list of possible text encodings
92__ENCODINGS: Final[tuple[tuple[tuple[bytes, ...], str], ...]] = \
93 (((codecs.BOM_UTF8,), UTF8),
94 ((codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE), "utf-32"),
95 ((codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE), "utf-16"))
98def _get_text_encoding(filename: str) -> str:
99 """
100 Get the text encoding from a BOM if present.
102 Adapted from https://stackoverflow.com/questions/13590749.
104 :param filename: the filename
105 :return: the encoding
106 """
107 with open(filename, "rb") as f:
108 header = f.read(4) # Read just the first four bytes.
109 for boms, encoding in __ENCODINGS:
110 for bom in boms:
111 if header.find(bom) == 0:
112 return encoding
113 return UTF8
116class Path(str):
117 """An immutable representation of a path."""
119 #: the common path version of this path, if any
120 __common: str | None
121 #: the internal state: 0=don't know, 1=file, 2=dir
122 __state: int
124 def __new__(cls, value):
125 """
126 Construct the object.
128 :param value: the string value
129 """
130 ret = super().__new__(cls, _canonicalize_path(value))
131 ret.__common = None
132 ret.__state = 0
133 return ret
135 def enforce_file(self) -> None:
136 """
137 Enforce that a path references an existing file.
139 :raises ValueError: if `path` does not reference an existing file
140 """
141 if self.__state == 0 and os.path.isfile(self):
142 self.__state = 1
143 if self.__state != 1:
144 raise ValueError(f"Path '{self}' does not identify a file.")
146 def enforce_dir(self) -> None:
147 """
148 Enforce that a path references an existing directory.
150 :raises ValueError: if `path` does not reference an existing directory
151 """
152 if self.__state == 0 and os.path.isdir(self):
153 self.__state = 2
154 if self.__state != 2:
155 raise ValueError(f"Path '{self}' does not identify a directory.")
157 def contains(self, other: str) -> bool:
158 """
159 Check whether another path is contained in this path.
161 :param other: the other path
162 :return: `True` is this path contains the other path, `False` if not
163 """
164 if self == other:
165 return True
166 if not os.path.isdir(self):
167 return False
168 if self.__common is None:
169 self.__common = os.path.commonpath([self])
170 return self.__common == os.path.commonpath([self, Path.path(other)])
172 def enforce_contains(self, other: str) -> None:
173 """
174 Raise an exception if this path does not contain the other path.
176 :param other: the other path
177 :raises ValueError: if `other` is not a sub-path of this path
178 """
179 self.enforce_dir()
180 if not self.contains(other):
181 raise ValueError(f"Path '{self}' does not contain '{other}'.")
183 def enforce_neither_contains(self, other: str) -> None:
184 """
185 Enforce that neither path contains another one.
187 :param other: the other path
188 :raises ValueError: if `other` is contained in this path or vice versa
189 """
190 if self.__common is None:
191 self.__common = os.path.commonpath([self])
192 opath: Final[Path] = Path.path(other)
193 joint: Final[str] = os.path.commonpath([self, opath])
194 if joint == self.__common:
195 raise ValueError(f"Path '{self}' contains '{opath}'.")
196 if opath.__common is None:
197 opath.__common = os.path.commonpath([opath])
198 if joint == opath.__common:
199 raise ValueError(f"Path '{opath}' contains '{self}'.")
201 def relative_to(self, base_path: str) -> str:
202 """
203 Compute a relative path of this path towards the given base path.
205 :param base_path: the string
206 :return: a relative path
207 :raises ValueError: if this path is not inside `base_path`
208 """
209 opath: Final[Path] = Path.path(base_path)
210 opath.enforce_contains(self)
211 return enforce_non_empty_str_without_ws(
212 os.path.relpath(self, opath))
214 def resolve_inside(self, relative_path: str) -> "Path":
215 """
216 Resolve a relative path to an absolute path inside this path.
218 :param relative_path: the path to resolve
219 :return: the resolved child path
220 :raises ValueError: If the path would resolve to something outside of
221 this path and/or if it is empty.
222 """
223 opath: Final[Path] = Path.path(os.path.join(
224 self, enforce_non_empty_str_without_ws(relative_path)))
225 self.enforce_contains(opath)
226 return opath
228 def ensure_file_exists(self) -> bool:
229 """
230 Atomically ensure that the file exists and create it otherwise.
232 :return: `True` if the file already existed and
233 `False` if it was newly and atomically created.
234 :raises: ValueError if anything goes wrong during the file creation
235 """
236 existed: bool = False
237 try:
238 os.close(os.open(self, os.O_CREAT | os.O_EXCL))
239 except FileExistsError:
240 existed = True
241 except Exception as err:
242 if isinstance(err, ValueError):
243 raise
244 raise ValueError(
245 f"Error when trying to create file '{self}'.") from err
246 self.enforce_file()
247 return existed
249 def ensure_dir_exists(self) -> None:
250 """Make sure that the directory exists, create it otherwise."""
251 try:
252 os.makedirs(name=self, exist_ok=True)
253 except FileExistsError:
254 pass
255 except Exception as err:
256 if isinstance(err, ValueError):
257 raise
258 raise ValueError(
259 f"Error when trying to create directory '{self}'.") from err
260 self.enforce_dir()
262 def __open_for_read(self) -> io.TextIOWrapper:
263 """
264 Open this file for reading.
266 :return: the file open for reading
267 """
268 return cast(io.TextIOWrapper, open( # noqa
269 self, encoding=_get_text_encoding(self), # noqa
270 errors="strict")) # noqa
272 def read_all_list(self) -> list[str]:
273 """
274 Read all the lines in a file.
276 :return: the list of strings of text
277 """
278 self.enforce_file()
279 with self.__open_for_read() as reader:
280 ret = reader.readlines()
281 if not isinstance(ret, list):
282 raise type_error(ret, "ret", list)
283 if len(ret) <= 0:
284 raise ValueError(f"File '{self}' contains no text.")
285 return ret
287 def read_all_str(self) -> str:
288 """
289 Read a file as a single string.
291 :return: the single string of text
292 """
293 self.enforce_file()
294 with self.__open_for_read() as reader:
295 ret = reader.read()
296 if not isinstance(ret, str):
297 raise type_error(ret, "ret", str)
298 if len(ret) <= 0:
299 raise ValueError(f"File '{self}' contains no text.")
300 return ret
302 def __open_for_write(self) -> io.TextIOWrapper:
303 """
304 Open the file for writing.
306 :return: the text io wrapper for writing
307 """
308 return cast(io.TextIOWrapper, open( # noqa
309 self, mode="w", encoding="utf-8", errors="strict")) # noqa
311 def write_all(self, contents: str | Iterable[str]) -> None:
312 """
313 Read all the lines in a file.
315 :param contents: the contents to write
316 """
317 self.ensure_file_exists()
318 if not isinstance(contents, str | Iterable):
319 raise type_error(contents, "contents", (str, Iterable))
320 with self.__open_for_write() as writer:
321 all_text = contents if isinstance(contents, str) \
322 else "\n".join(contents)
323 if len(all_text) <= 0:
324 raise ValueError("Writing empty text is not permitted.")
325 all_text = regex_sub(_PATTERN_TRAILING_WHITESPACE,
326 "\n", all_text.rstrip())
327 if len(all_text) <= 0:
328 raise ValueError(
329 "Text becomes empty after removing trailing whitespace?")
330 writer.write(all_text)
331 if all_text[-1] != "\n":
332 writer.write("\n")
334 def as_directory(self) -> "Path":
335 """
336 Return the closest directory along this path.
338 :return: the directory: either this path if it already identifies a
339 directory, or the parent directory if this path identifies a file.
340 :raises ValueError: if no containing directory exists
341 """
342 if os.path.isfile(self):
343 base_dir = Path.path(os.path.dirname(self))
344 else:
345 base_dir = self
346 base_dir.enforce_dir()
347 return base_dir
349 def resolve_input_file(self,
350 relative_path: str,
351 lang: str | None = None) -> "Path":
352 """
353 Resolve a path to an input file relative to this path.
355 :param relative_path: the relative path to resolve
356 :param lang: the language to use
357 :return: the resolved path
358 :raises ValueError: if the path cannot be resolved to a file
359 """
360 relative_path = enforce_non_empty_str_without_ws(relative_path)
361 lang = None if lang is None \
362 else enforce_non_empty_str_without_ws(lang)
364 base_dir: Final[Path] = self.as_directory()
365 candidate: Path
367 if lang is not None:
368 prefix, suffix = Path.split_prefix_suffix(relative_path)
369 candidate = base_dir.resolve_inside(f"{prefix}_{lang}.{suffix}")
370 if os.path.isfile(candidate):
371 candidate.__state = 1
372 return candidate
373 candidate = base_dir.resolve_inside(relative_path)
374 candidate.enforce_file()
375 return candidate
377 @staticmethod
378 def split_prefix_suffix(name: str,
379 enforce_suffix: bool = True) -> tuple[str, str]:
380 """
381 Split the file name 'name' into a prefix and a suffix.
383 :param name: the file name
384 :param enforce_suffix: crash if no suffix?
385 :return: a tuple of [prefix, suffix]
386 """
387 dot: int = name.rfind(".")
388 if (dot < 0) or (dot >= (len(name) - 1)):
389 if enforce_suffix:
390 raise ValueError(f"'{name}' does not have suffix?")
391 return enforce_non_empty_str_without_ws(name), ""
393 # check for stuff such as tar.xz and tar.gz
394 dot2: Final[int] = name.rfind(".", 0, dot - 1)
395 if 0 < dot2 < dot and name[dot2 + 1:dot] == "tar":
396 dot = dot2
397 return enforce_non_empty_str_without_ws(name[:dot]), \
398 enforce_non_empty_str_without_ws(name[dot + 1:])
400 @staticmethod
401 def path(path: str) -> "Path":
402 """
403 Get a canonical path.
405 :param path: the path to canonicalize
406 :return: the `Path` instance
407 """
408 if isinstance(path, Path):
409 return cast(Path, path)
410 return Path(path)
412 @staticmethod
413 def file(path: str) -> "Path":
414 """
415 Get a path identifying a file.
417 :param path: the path
418 :return: the file
419 """
420 fi: Final[Path] = Path.path(path)
421 fi.enforce_file()
422 return fi
424 @staticmethod
425 def directory(path: str) -> "Path":
426 """
427 Get a path identifying a directory.
429 :param path: the path
430 :return: the file
431 """
432 fi: Final[Path] = Path.path(path)
433 fi.enforce_dir()
434 return fi
436 @staticmethod
437 def copy_file(source: str,
438 dest: str) -> "Path":
439 """
440 Copy one file to another one, doing gz-unzipping if necessary.
442 This method copies a source file to a destination file.
443 If the source file has suffix "svgz" and the destination file has
444 suffix "svg" OR if the source file has suffix "gz" and the destination
445 file has not, then we will unzip the source file to the destination
446 file.
447 Otherwise, a normal copy is performed.
449 :param source: the source file
450 :param dest: the destination file
451 :return: the fully-qualified destination path
452 """
453 source_file = Path.file(source)
454 dest_file = Path.path(dest)
455 if source_file == dest_file:
456 raise ValueError(f"Cannot copy file '{dest_file}' into itself.")
457 _, ssuffix = Path.split_prefix_suffix(source_file, False)
458 _, dsuffix = Path.split_prefix_suffix(dest_file, False)
459 if ((ssuffix == "svgz") and (dsuffix == "svg")) or \
460 ((ssuffix == "gz") and (dsuffix != "gz")):
461 copy = _copy_un_gzip
462 else:
463 copy = copy_pure
464 copy(source_file, dest_file)
465 dest_file.enforce_file()
466 return dest_file
468 @staticmethod
469 def copy_resource(source_dir: str,
470 input_file: str,
471 dest_dir: str) -> "Path":
472 """
473 Copy an input file to an destination directory.
475 :param source_dir: the source directory
476 :param input_file: the input file
477 :param dest_dir: the destination directory
478 :return: the path
479 """
480 in_dir = Path.path(source_dir)
481 in_dir.enforce_dir()
482 in_file = Path.path(input_file)
483 in_file.enforce_file()
484 out_dir = Path.path(dest_dir)
485 out_dir.enforce_dir()
486 in_dir.enforce_neither_contains(out_dir)
488 rel_path = in_file.relative_to(in_dir)
489 prefix, suffix = Path.split_prefix_suffix(rel_path)
490 if suffix == "svgz":
491 rel_path = f"{prefix}.svg"
493 out_path = out_dir.resolve_inside(rel_path)
494 inner_dir = Path.path(os.path.dirname(out_path))
495 out_dir.enforce_contains(inner_dir)
496 inner_dir.ensure_dir_exists()
497 return Path.copy_file(in_file, out_path)