"""The base class with the information of a build."""
import codecs
import gzip
import io
import os.path
import shutil
from re import MULTILINE
from re import compile as _compile
from typing import Final, Iterable, Pattern, cast
from bookbuilderpy.strings import enforce_non_empty_str_without_ws, regex_sub
from bookbuilderpy.types import type_error
def _canonicalize_path(path: str) -> str:
"""
Check and canonicalize a path.
:param path: the path
:return: the canonicalized path
"""
if not isinstance(path, str):
raise type_error(path, "path", str)
if len(path) <= 0:
raise ValueError("Path must not be empty.")
path = os.path.normcase(
os.path.abspath(
os.path.realpath(
os.path.expanduser(
os.path.expandvars(path)))))
if not isinstance(path, str):
raise type_error(path, "canonicalized path", str)
if len(path) <= 0:
raise ValueError("Canonicalization must yield non-empty string, "
f"but returned '{path}'.")
if path in [".", ".."]:
raise ValueError(f"Canonicalization cannot yield '{path}'.")
return path
[docs]def copy_pure(path_in: str, path_out: str) -> "Path":
"""
Perform the internal method to copy a file.
:param path_in: the path to the input file
:param path_out: the path to the output file
:returns: the path to the new file
"""
return Path.file(str(shutil.copyfile(path_in, path_out)))
[docs]def move_pure(path_in: str, path_out: str) -> "Path":
"""
Copy a file.
:param path_in: the path to the input file
:param path_out: the path to the output file
:returns: the path to the new file
"""
po = Path.path(path_out)
shutil.move(path_in, po)
po.enforce_file()
return po
def _copy_un_gzip(path_in: str, path_out: str) -> "Path":
"""
Copy a gzip-compressed file.
:param path_in: the path to the input file
:param path_out: the path to the output file
:returns: the path to the new file
"""
po = Path.path(path_out)
with gzip.open(path_in, "rb") as f_in, open(po, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
po.enforce_file()
return po
#: a pattern used to clean up training white space
_PATTERN_TRAILING_WHITESPACE: Final[Pattern] = \
_compile(r"[ \t]+\n", MULTILINE)
#: the UTF-8 encoding
UTF8: Final[str] = "utf-8-sig"
#: The list of possible text encodings
__ENCODINGS: Final[tuple[tuple[tuple[bytes, ...], str], ...]] = \
(((codecs.BOM_UTF8,), UTF8),
((codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE), "utf-32"),
((codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE), "utf-16"))
def _get_text_encoding(filename: str) -> str:
"""
Get the text encoding from a BOM if present.
Adapted from https://stackoverflow.com/questions/13590749.
:param filename: the filename
:return: the encoding
"""
with open(filename, "rb") as f:
header = f.read(4) # Read just the first four bytes.
for boms, encoding in __ENCODINGS:
for bom in boms:
if header.find(bom) == 0:
return encoding
return UTF8
[docs]class Path(str):
"""An immutable representation of a path."""
#: the common path version of this path, if any
__common: str | None
#: the internal state: 0=don't know, 1=file, 2=dir
__state: int
def __new__(cls, value):
"""
Construct the object.
:param value: the string value
"""
ret = super().__new__(cls, _canonicalize_path(value))
ret.__common = None
ret.__state = 0
return ret
[docs] def enforce_file(self) -> None:
"""
Enforce that a path references an existing file.
:raises ValueError: if `path` does not reference an existing file
"""
if self.__state == 0 and os.path.isfile(self):
self.__state = 1
if self.__state != 1:
raise ValueError(f"Path '{self}' does not identify a file.")
[docs] def enforce_dir(self) -> None:
"""
Enforce that a path references an existing directory.
:raises ValueError: if `path` does not reference an existing directory
"""
if self.__state == 0 and os.path.isdir(self):
self.__state = 2
if self.__state != 2:
raise ValueError(f"Path '{self}' does not identify a directory.")
[docs] def contains(self, other: str) -> bool:
"""
Check whether another path is contained in this path.
:param other: the other path
:return: `True` is this path contains the other path, `False` if not
"""
if self == other:
return True
if not os.path.isdir(self):
return False
if self.__common is None:
self.__common = os.path.commonpath([self])
return self.__common == os.path.commonpath([self, Path.path(other)])
[docs] def enforce_contains(self, other: str) -> None:
"""
Raise an exception if this path does not contain the other path.
:param other: the other path
:raises ValueError: if `other` is not a sub-path of this path
"""
self.enforce_dir()
if not self.contains(other):
raise ValueError(f"Path '{self}' does not contain '{other}'.")
[docs] def enforce_neither_contains(self, other: str) -> None:
"""
Enforce that neither path contains another one.
:param other: the other path
:raises ValueError: if `other` is contained in this path or vice versa
"""
if self.__common is None:
self.__common = os.path.commonpath([self])
opath: Final[Path] = Path.path(other)
joint: Final[str] = os.path.commonpath([self, opath])
if joint == self.__common:
raise ValueError(f"Path '{self}' contains '{opath}'.")
if opath.__common is None:
opath.__common = os.path.commonpath([opath])
if joint == opath.__common:
raise ValueError(f"Path '{opath}' contains '{self}'.")
[docs] def relative_to(self, base_path: str) -> str:
"""
Compute a relative path of this path towards the given base path.
:param base_path: the string
:return: a relative path
:raises ValueError: if this path is not inside `base_path`
"""
opath: Final[Path] = Path.path(base_path)
opath.enforce_contains(self)
return enforce_non_empty_str_without_ws(
os.path.relpath(self, opath))
[docs] def resolve_inside(self, relative_path: str) -> "Path":
"""
Resolve a relative path to an absolute path inside this path.
:param relative_path: the path to resolve
:return: the resolved child path
:raises ValueError: If the path would resolve to something outside of
this path and/or if it is empty.
"""
opath: Final[Path] = Path.path(os.path.join(
self, enforce_non_empty_str_without_ws(relative_path)))
self.enforce_contains(opath)
return opath
[docs] def ensure_file_exists(self) -> bool:
"""
Atomically ensure that the file exists and create it otherwise.
:return: `True` if the file already existed and
`False` if it was newly and atomically created.
:raises: ValueError if anything goes wrong during the file creation
"""
existed: bool = False
try:
os.close(os.open(self, os.O_CREAT | os.O_EXCL))
except FileExistsError:
existed = True
except Exception as err:
if isinstance(err, ValueError):
raise
raise ValueError(
f"Error when trying to create file '{self}'.") from err
self.enforce_file()
return existed
[docs] def ensure_dir_exists(self) -> None:
"""Make sure that the directory exists, create it otherwise."""
try:
os.makedirs(name=self, exist_ok=True)
except FileExistsError:
pass
except Exception as err:
if isinstance(err, ValueError):
raise
raise ValueError(
f"Error when trying to create directory '{self}'.") from err
self.enforce_dir()
def __open_for_read(self) -> io.TextIOWrapper:
"""
Open this file for reading.
:return: the file open for reading
"""
return cast(io.TextIOWrapper, open( # noqa
self, encoding=_get_text_encoding(self), # noqa
errors="strict")) # noqa
[docs] def read_all_list(self) -> list[str]:
"""
Read all the lines in a file.
:return: the list of strings of text
"""
self.enforce_file()
with self.__open_for_read() as reader:
ret = reader.readlines()
if not isinstance(ret, list):
raise type_error(ret, "ret", list)
if len(ret) <= 0:
raise ValueError(f"File '{self}' contains no text.")
return ret
[docs] def read_all_str(self) -> str:
"""
Read a file as a single string.
:return: the single string of text
"""
self.enforce_file()
with self.__open_for_read() as reader:
ret = reader.read()
if not isinstance(ret, str):
raise type_error(ret, "ret", str)
if len(ret) <= 0:
raise ValueError(f"File '{self}' contains no text.")
return ret
def __open_for_write(self) -> io.TextIOWrapper:
"""
Open the file for writing.
:return: the text io wrapper for writing
"""
return cast(io.TextIOWrapper, open( # noqa
self, mode="w", encoding="utf-8", errors="strict")) # noqa
[docs] def write_all(self, contents: str | Iterable[str]) -> None:
"""
Read all the lines in a file.
:param contents: the contents to write
"""
self.ensure_file_exists()
if not isinstance(contents, str | Iterable):
raise type_error(contents, "contents", (str, Iterable))
with self.__open_for_write() as writer:
all_text = contents if isinstance(contents, str) \
else "\n".join(contents)
if len(all_text) <= 0:
raise ValueError("Writing empty text is not permitted.")
all_text = regex_sub(_PATTERN_TRAILING_WHITESPACE,
"\n", all_text.rstrip())
if len(all_text) <= 0:
raise ValueError(
"Text becomes empty after removing trailing whitespace?")
writer.write(all_text)
if all_text[-1] != "\n":
writer.write("\n")
[docs] def as_directory(self) -> "Path":
"""
Return the closest directory along this path.
:return: the directory: either this path if it already identifies a
directory, or the parent directory if this path identifies a file.
:raises ValueError: if no containing directory exists
"""
if os.path.isfile(self):
base_dir = Path.path(os.path.dirname(self))
else:
base_dir = self
base_dir.enforce_dir()
return base_dir
[docs] @staticmethod
def split_prefix_suffix(name: str,
enforce_suffix: bool = True) -> tuple[str, str]:
"""
Split the file name 'name' into a prefix and a suffix.
:param name: the file name
:param enforce_suffix: crash if no suffix?
:return: a tuple of [prefix, suffix]
"""
dot: int = name.rfind(".")
if (dot < 0) or (dot >= (len(name) - 1)):
if enforce_suffix:
raise ValueError(f"'{name}' does not have suffix?")
return enforce_non_empty_str_without_ws(name), ""
# check for stuff such as tar.xz and tar.gz
dot2: Final[int] = name.rfind(".", 0, dot - 1)
if 0 < dot2 < dot and name[dot2 + 1:dot] == "tar":
dot = dot2
return enforce_non_empty_str_without_ws(name[:dot]), \
enforce_non_empty_str_without_ws(name[dot + 1:])
[docs] @staticmethod
def path(path: str) -> "Path":
"""
Get a canonical path.
:param path: the path to canonicalize
:return: the `Path` instance
"""
if isinstance(path, Path):
return cast(Path, path)
return Path(path)
[docs] @staticmethod
def file(path: str) -> "Path":
"""
Get a path identifying a file.
:param path: the path
:return: the file
"""
fi: Final[Path] = Path.path(path)
fi.enforce_file()
return fi
[docs] @staticmethod
def directory(path: str) -> "Path":
"""
Get a path identifying a directory.
:param path: the path
:return: the file
"""
fi: Final[Path] = Path.path(path)
fi.enforce_dir()
return fi
[docs] @staticmethod
def copy_file(source: str,
dest: str) -> "Path":
"""
Copy one file to another one, doing gz-unzipping if necessary.
This method copies a source file to a destination file.
If the source file has suffix "svgz" and the destination file has
suffix "svg" OR if the source file has suffix "gz" and the destination
file has not, then we will unzip the source file to the destination
file.
Otherwise, a normal copy is performed.
:param source: the source file
:param dest: the destination file
:return: the fully-qualified destination path
"""
source_file = Path.file(source)
dest_file = Path.path(dest)
if source_file == dest_file:
raise ValueError(f"Cannot copy file '{dest_file}' into itself.")
_, ssuffix = Path.split_prefix_suffix(source_file, False)
_, dsuffix = Path.split_prefix_suffix(dest_file, False)
if ((ssuffix == "svgz") and (dsuffix == "svg")) or \
((ssuffix == "gz") and (dsuffix != "gz")):
copy = _copy_un_gzip
else:
copy = copy_pure
copy(source_file, dest_file)
dest_file.enforce_file()
return dest_file
[docs] @staticmethod
def copy_resource(source_dir: str,
input_file: str,
dest_dir: str) -> "Path":
"""
Copy an input file to an destination directory.
:param source_dir: the source directory
:param input_file: the input file
:param dest_dir: the destination directory
:return: the path
"""
in_dir = Path.path(source_dir)
in_dir.enforce_dir()
in_file = Path.path(input_file)
in_file.enforce_file()
out_dir = Path.path(dest_dir)
out_dir.enforce_dir()
in_dir.enforce_neither_contains(out_dir)
rel_path = in_file.relative_to(in_dir)
prefix, suffix = Path.split_prefix_suffix(rel_path)
if suffix == "svgz":
rel_path = f"{prefix}.svg"
out_path = out_dir.resolve_inside(rel_path)
inner_dir = Path.path(os.path.dirname(out_path))
out_dir.enforce_contains(inner_dir)
inner_dir.ensure_dir_exists()
return Path.copy_file(in_file, out_path)