"""A post-processed representation of repository files."""
import json
import os
from contextlib import AbstractContextManager
from os.path import getsize
from shutil import rmtree
from tempfile import mkstemp
from typing import Final, Iterable
from pycommons.io.console import logger
from pycommons.io.path import Path, file_path, write_lines
from pycommons.net.url import URL
from pycommons.processes.python import PYTHON_ENV, PYTHON_INTERPRETER
from pycommons.processes.shell import STREAM_CAPTURE, Command
from pycommons.types import type_error
from latexgit.repository.fix_path import replace_base_path
from latexgit.repository.gitmanager import GitManager
def _str_tuple(command: None | str | Iterable[str],
none_ok: bool = True,
empty_ok: bool = True,
raise_errors: bool = False) -> tuple[str, ...] | None:
"""
Get a string tuple from some source.
:param command: the original command
:param none_ok: is `None` OK (and should lead to an empty tuple returned)?
:param empty_ok: is it OK if an empty tuple is returned?
:param raise_errors: should we raise errors or just return `None`?
:return: the tuple
"""
if command is None:
if (not raise_errors) or (none_ok and empty_ok):
return ()
if none_ok:
raise ValueError(
f"Empty command not permitted, but got {command!r}.")
if empty_ok:
raise ValueError("None is not OK as command.")
raise ValueError(
f"Neither None nor empty commands are OK, but got {command!r}.")
res: list[str] = []
if isinstance(command, str):
command = [command]
if not isinstance(command, Iterable):
if raise_errors:
raise type_error(command, "command", Iterable)
return None
for i, o in enumerate(command):
if not isinstance(o, str):
if raise_errors:
raise type_error(o, f"command[{i}]", str)
return None
use_o = str.strip(o)
if str.__len__(use_o) > 0:
res.append(use_o)
if list.__len__(res) <= 0:
if not empty_ok:
if raise_errors:
raise ValueError(
f"Empty command not OK, but got {command!r}.")
return None
return ()
return tuple(res)
def _write(orig: str, dest: Path) -> None:
"""
Write the string to the destination.
:param orig: the original string
:param dest: the destination
"""
orig = str.rstrip(orig)
with dest.open_for_write() as output:
write_lines(map(str.rstrip, str.rstrip(orig).splitlines()), output)
logger("Wrote r-stripped string of originally "
f"{str.__len__(orig)} characters to {dest!r}, "
f"produced file of size {getsize(dest)} bytes.")
[docs]
class Processed(AbstractContextManager):
"""A manager for processed files."""
def __init__(self, base_dir: str) -> None:
"""
Initialize the post processed manager.
:param base_dir: the base directory
"""
#: is the processor still open?
self.__is_open: bool = True
#: the base path of the processor
self.base_path: Final[Path] = Path(base_dir)
self.base_path.ensure_dir_exists()
#: the internal repository manager
self.__git: Final[GitManager] = GitManager(
self.base_path.resolve_inside("git"))
#: the directory to store post-processed stuff
self.__cache_dir: Final[Path] = self.base_path.resolve_inside(
".cache")
self.__cache_dir.ensure_dir_exists()
#: the mapping of post-processing commands and resources
self.__cache_mapped: Final[dict[
tuple[Path, tuple[str, ...]], Path]] = {}
#: the cache file
self.__cache_list: Final[Path] = self.__cache_dir.resolve_inside(
".cache_list.json")
#: load the cache list
self.__cache_list.ensure_file_exists()
if getsize(self.__cache_list) > 0: # file size > 0
s = self.__cache_list.read_all_str()
if len(s) > 0: # load all cached mappings
for key, value in json.loads(s):
pt1: Path = Path(key[0])
if (not pt1.is_file()) or (
not self.__git.base_dir.contains(pt1)):
continue
pt2: Path = Path(value)
if (not pt2.is_file()) or (
not self.__cache_dir.contains(pt2)):
continue
cmd1: tuple[str, ...] | None = _str_tuple(
key[1], True, True, False)
if cmd1 is None:
continue
self.__cache_mapped[(pt1, cmd1)] = pt2
#: the directory to store generated
self.__generated_dir: Final[Path] = self.base_path.resolve_inside(
".generated")
self.__generated_dir.ensure_dir_exists()
#: the mapping of command lines and generated resources
self.__generated_mapped: Final[dict[tuple[
Path | None, tuple[str, ...]], Path]] = {}
#: the generated list file
self.__generated_list: Final[Path] = (
self.__generated_dir.resolve_inside(".generated_list.json"))
#: load the generated list
self.__generated_list.ensure_file_exists()
if getsize(self.__generated_list) > 0: # file size > 0
s = self.__generated_list.read_all_str()
if len(s) > 0: # load all cached mappings
for key, value in json.loads(s):
pt3: Path | None = Path(key[0]) \
if key[0] is not None else None
if not ((pt3 is None) or self.__git.is_git_repo_path(
pt3)):
continue # purge dirs not assigned to git repos
cmd2: tuple[str, ...] | None = _str_tuple(
key[1], False, False, False)
if cmd2 is None:
continue # purge empty commands
pt4: Path = Path(value)
if (not pt4.is_file()) or (
not self.__generated_dir.contains(pt4)):
continue # purge invalid cache entries
self.__generated_mapped[(pt3, cmd2)] = pt4
[docs]
def get_file_and_url(
self, repo_url: str, relative_path: str,
processor: Iterable[str] | None = ()) -> tuple[Path, URL]:
"""
Get a specified, potentially pre-processed file.
:param repo_url: the repository url
:param relative_path: the relative path of the file
:param processor: the pre-processor commands
:return: the file and the url into the git repository of the original
"""
if not self.__is_open:
raise ValueError("already closed!")
if processor is None:
processor = ()
if not isinstance(processor, Iterable):
raise type_error(processor, "preprocessor", Iterable)
if not isinstance(repo_url, str):
raise type_error(repo_url, "repo_url", str)
if not isinstance(relative_path, str):
raise type_error(relative_path, "relative_path", str)
# first step: get source repository file
ps: Final[tuple[Path, URL]] = self.__git.get_file_and_url(
repo_url, relative_path)
path: Final[Path] = ps[0]
# second step: prepare postprocessing command
command: Final[tuple[str, ...]] = _str_tuple(
processor, True, True, True)
if len(command) <= 0: # no postprocessing command?
return path, ps[1] # then return path to git file directory
# so there is postprocessing to do: look up in cache
key: Final[tuple[Path, tuple[str, ...]]] = (path, command)
log_str: str = f"{path!r} via {' '.join(repr(c) for c in command)}"
if key in self.__cache_mapped: # found in cache?
pt: Path = self.__cache_mapped[key]
if pt.is_file():
logger(f"found cache entry {pt!r} for {log_str}.")
return pt, ps[1] # return path to cached file
# not in cache: create new file and apply post-processing
(handle, fpath) = mkstemp(prefix="proc_", dir=self.__cache_dir)
os.close(handle)
dest: Final[Path] = file_path(fpath)
log_str = f"from {log_str} to {dest!r}"
logger(f"will pipe data from {path!r} via {log_str}")
# execute the command
_write(Command(
command=command, working_dir=self.__cache_dir,
stdout=STREAM_CAPTURE, stdin=path.read_all_str()).execute(
True)[0], dest)
self.__cache_mapped[key] = dest # remember in cache
return dest, ps[1] # return path
[docs]
def get_output(
self, command: str | Iterable[str],
repo_url: str | None = None,
relative_dir: str | None = None) -> tuple[Path, URL | None]:
"""
Get the output of a certain command.
:param command: the command itself
:param repo_url: the optional repository URL
:param relative_dir: the optional directory inside the repository
where the command should be executed
:return: the path to the output and the url of the git repository,
if any
"""
if not self.__is_open:
raise ValueError("already closed!")
command = _str_tuple(command, False, False, True)
if isinstance(repo_url, str):
if str.__len__(repo_url) <= 0:
repo_url = None
elif repo_url is not None:
raise type_error(repo_url, "repo_url", (str, None))
if isinstance(relative_dir, str):
if str.__len__(relative_dir) <= 0:
relative_dir = None
elif relative_dir is not None:
raise type_error(relative_dir, "relative_dir", (str, None))
if (repo_url is None) != (relative_dir is None):
raise ValueError(f"repo_url and relative_dir must either both be "
f"None or neither, but they are {repo_url!r} "
f"and {relative_dir!r}.")
repo_path: Path | None = None
path: Path | None = None
url: URL | None = None
if repo_url is not None:
repo = self.__git.get_repo(repo_url)
repo_path = repo.path
path = repo_path.resolve_inside(relative_dir)
url = repo.url
key: tuple[Path | None, tuple[str, ...]] = (path, command)
log_str: str = f"{' '.join(repr(c) for c in command)} in {path!r}"
if key in self.__generated_mapped:
result: Path = self.__generated_mapped[key]
if result.is_file():
logger(f"found cache entry {result!r} for {log_str}.")
return result, url # return path to cached file
(handle, fpath) = mkstemp(prefix="gen_", dir=self.__generated_dir)
os.close(handle)
dest: Final[Path] = file_path(fpath)
log_str = f"from {log_str} to {dest!r}"
logger(f"will pipe data from {path!r} via {log_str}")
# Now we need to fix the command if we are running inside a virtual
# environment. If we are running inside a virtual environment, it is
# necessary to use the same Python interpreter that was used to run
# latexgit. We should also pass along all the Python-related
# environment parameters.
use_cmd: str | tuple[str, ...] = command
if isinstance(use_cmd, tuple) and (tuple.__len__(use_cmd) > 1) and (
str.lower(use_cmd[0]).startswith("python3")):
lcmd: list[str] = list(use_cmd)
lcmd[0] = PYTHON_INTERPRETER
use_cmd = tuple(lcmd)
# execute the command
output: str = Command(
command=use_cmd, working_dir=path, env=PYTHON_ENV,
stdout=STREAM_CAPTURE).execute(True)[0]
if repo_path is not None: # fix the base path
output = replace_base_path(output, repo_path)
_write(output, dest)
self.__generated_mapped[key] = dest
return dest, url
[docs]
def close(self) -> None:
"""Close the processed repository and write cache list."""
opn: bool = self.__is_open
self.__is_open = False
if opn: # only if we were open...
# flush or clear directory of cached post-processed files
if len(self.__cache_mapped) > 0: # we got cached files
self.__cache_list.write_all_str(json.dumps( # store cache
list(self.__cache_mapped.items())))
else: # no cache files? we can delete cache directory
rmtree(self.__cache_dir, ignore_errors=True)
# flush or clear directory of generated resources
if len(self.__generated_mapped) > 0: # we got generated files
self.__generated_list.write_all_str(json.dumps( # store cache
list(self.__generated_mapped.items())))
else: # no cache files? we can delete cache directory
rmtree(self.__generated_dir, ignore_errors=True)
def __exit__(self, exception_type, _, __) -> bool:
"""
Close the context manager.
:param exception_type: ignored
:param _: ignored
:param __: ignored
:returns: `True` to suppress an exception, `False` to rethrow it
"""
self.close() # close the manager and flush cache
return exception_type is None