pycommons.processes package

Utilities for dealing with processes.

Submodules

pycommons.processes.caller module

Get information about how this process was called.

pycommons.processes.caller.is_build()[source]

Check if the program was run inside a build.

This function is True if the process is running inside a make build or if is_ci_run() is True or if the evironment variable BUILD_SCRIPT is set.

Since we now need to use virtual environments to install pip packages, using make scripts has become too cumbersome to me. I simply cannot be bothered to figure out how to set up a virtual environment make script wide. Instead, I now use a bash script (make.sh) in my builds. To properly detect this, this script sets the environment variable BUILD_SCRIPT. In all my pycommons-based projects, I will do this from now on.

Basically, if you want to signal that code runs inside a build, you can set an environment variable as export BUILD_SCRIPT=”${BASH_SOURCE[0]}” inside your bash build script. This will be used as signal by this function that we are running inside a build.

Return type:

bool

Returns:

True if this process is executed as part of a build process, False otherwise.

>>> isinstance(is_build(), bool)
True
pycommons.processes.caller.is_ci_run()[source]

Check if the program runs in a continuous integration environment.

Right now, only GitHub actions are recognized. Other CI tools are currently not supported.

Return type:

bool

Returns:

True if this process is executed as part of, e.g., a GitHub action, False otherwise.

>>> isinstance(is_ci_run(), bool)
True
pycommons.processes.caller.is_doc_test()[source]

Check if this process was invoked by a unit doctest.

Return type:

bool

Returns:

True if this function was called by a unit doctest, False otherwise

>>> is_doc_test()
True

pycommons.processes.fork module

Fork a Python interpreter process multiple times.

The use case for this tool is when you have a main script in your Python program and want to allow a user to launch it multiple times with the sme parameters. In this case, she would simply provide the argument –fork NNN, where NNN is either the absolute number of times the program should be launched or a fraction in (0, 1) of the available logical cores to use.

You would use the normal ArgumentParser in your main code piece, i.e., do something like if __name__ == “__main__”: followed by ` parser: Final[ArgumentParser] = …`.

Then, you would invoke args = fork(parser), where fork() is the function provided here. This function will return None if it has detected the –fork argument. In that case, it will have launched the process again the appropriate number of times and waited for its completion already. If it did not detect the –fork argument, it will return the Namespace instance with the normal arguments. Actually, it will do exactly this in the launched copies of the process.

So from the outside, this looks pretty much like the good old fork command on Unix systems. However, it can do a little bit more. For each process copy (except the original process), the args.fork_id will hold a unique ID of the forked process.

Also, you can provide the command –fork-log-dir YYY to the original process. In this case, each of the forked processes will not be launched normally. Instead, its stdout and stderr will be piped into two different files inside the directory YYY. This makes working with experiments, like in the moptipy framework (see <https://thomasweise.github.io/moptipy>) easier.

The gist is this: Under the hood, this looks as if you can do Unix-style forks of the Python interpreter. Actually, it launches separate interpreter processes with the same arguments (plus the fork_id parameter and minus the forking-arguments).

Side note: If you do not provide a logging directory and the number of processes to launch would be 1, then no forking takes place. This then just returns the normal arguments as if no forking parameters were provided at all.

pycommons.processes.fork.fork(parser)[source]

Launch this Python process multiple times if requested to.

If the user provided an argument –fork NNN, where NNN is either an absolute number of processes to launch or a fraction in (0, 1) of logical CPU cores to use, then this function will invoke the interpreter the corresponding number of times with exactly the same command line arguments except the forking parameters.

You can provide an argument –fork-log-dir DDD, where DDD is a directory. If you do this, then the stdout and stderr of each launched process are piped into files inside this directory.

If forking is done, then each forked process gets an additional argument fork_id with a unique identifier.

If no forking arguments are provided, of if we would fork just 1 process without logging directory, then no forking is done. In this case, this routine just returns the Namespace instance with the command line arguments. If this actually already is a forked process, then, too, the Namespace instance with the arguments (plus the fork_id) is returned. If this is the root process from which the forks were started, then None is returned. In that case, the function returns after all sub-processes are completed.

Parameters:

parser (ArgumentParser) – the root argument parser

Return type:

Namespace | None

Returns:

None if the argument parser contained forking arguments and the same process was forked multiple times, otherwise the Namespace with the arguments.

pycommons.processes.fork.get_cores(use, n_cpu=None)[source]

Compute the number of CPU cores to be used (for forking).

Parameters:
  • use (int | float) – the usage number, either a float between 0 and 1 denoting a fraction of cores to be used, or the absolute number.

  • n_cpu (int | None, default: None) – the number of CPU cores available, or None if we should determine it automatically.

Return type:

int | None

Returns:

the number of cores

>>> get_cores(1)
1
>>> get_cores(2)
2
>>> get_cores(2.3)
2
>>> get_cores(2.6)
3
>>> get_cores(0.5, 10)
5
>>> get_cores(0.3, 10)
3
>>> get_cores(0.5, 16)
8
>>> get_cores(0.5, 1)
1
>>> 0 < get_cores(0.5) < 10000
True
>>> try:
...     get_cores("a")
... except TypeError as te:
...     print(te)
use should be an instance of any in {float, int} but is str, namely 'a'.
>>> try:
...     get_cores(0.3, "a")
... except TypeError as te:
...     print(te)
n_cpu should be an instance of int but is str, namely 'a'.
>>> try:
...     get_cores(-1)
... except ValueError as v:
...     print(v)
Invalid value -1 for number of cores to use.

pycommons.processes.multishell module

A tool for running multiple commands in parallel.

>>> from pycommons.processes.shell import STREAM_CAPTURE
>>> c1 = Command(("echo", "123"), stdout=STREAM_CAPTURE)
>>> c2 = Command(("echo", "abc"), stdout=STREAM_CAPTURE)
>>> multi_execute((c1, c2), True)
(('123\n', None), ('abc\n', None))
>>> multi_execute((c1, ), False)
(('123\n', None),)
>>> multi_execute((c1, c2, c2, c2), True)
(('123\n', None), ('abc\n', None), ('abc\n', None), ('abc\n', None))
pycommons.processes.multishell.multi_execute(commands, log=True)[source]

Execute multiple commands in parallel.

Parameters:
  • commands (Iterable[Command]) – the iterable of the commands to execute

  • log (bool, default: True) – shall the execution state be logged?

Return type:

tuple[tuple[str | None, str | None], ...]

Returns:

the results of the commands

>>> from pycommons.processes.shell import STREAM_CAPTURE
>>> c1 = Command(("echo", "123"), stdout=STREAM_CAPTURE)
>>> c2 = Command(("echo", "abc"), stdout=STREAM_CAPTURE)
>>> multi_execute((), False)
()
>>> multi_execute((), True)
()
>>> multi_execute((c1, ), False)
(('123\n', None),)
>>> multi_execute((c1, ), True)
(('123\n', None),)
>>> multi_execute((c1, c2), False)
(('123\n', None), ('abc\n', None))
>>> multi_execute((c1, c2), True)
(('123\n', None), ('abc\n', None))
>>> multi_execute((c1, c2, c2, c2), True)
(('123\n', None), ('abc\n', None), ('abc\n', None), ('abc\n', None))
>>> multi_execute((c1, c2, c2, c2), False)
(('123\n', None), ('abc\n', None), ('abc\n', None), ('abc\n', None))
>>> try:
...     multi_execute(1)
... except TypeError as te:
...     print(te)
commands should be an instance of typing.Iterable but is int, namely 1.
>>> try:
...     multi_execute((c1, c2), 3)
... except TypeError as te:
...     print(te)
log should be an instance of bool but is int, namely 3.
>>> try:
...     multi_execute(("x", ))
... except TypeError as te:
...     print(str(te)[:20])
commands[0] should b

pycommons.processes.python module

Some utilities for dealing with python.

>>> PYTHON_INTERPRETER.is_file()
True
>>> PYTHON_INTERPRETER_SHORT
'python3'
>>> len(__BASE_PATHS) > 0
True
>>> all((isinstance(f, Path) for f in __BASE_PATHS))
True
>>> all((len(__BASE_PATHS[i]) >= len(__BASE_PATHS[i + 1])
...      for i in range(len(__BASE_PATHS) - 1)))
True
pycommons.processes.python.PYTHON_ENV: Final[Mapping[str, str]] = mappingproxy({'PATH': '/tmp/tmp.tWDX4Z0jlb/bin:/opt/hostedtoolcache/Python/3.12.12/x64/bin:/opt/hostedtoolcache/Python/3.12.12/x64:/snap/bin:/home/runner/.local/bin:/opt/pipx_bin:/home/runner/.cargo/bin:/home/runner/.config/composer/vendor/bin:/usr/local/.ghcup/bin:/home/runner/.dotnet/tools:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin', 'PYTHONPATH': '/home/runner/work/pycommons/pycommons', 'PYTHON_INTERPRETER': '/tmp/tmp.tWDX4Z0jlb/bin/python3', 'VIRTUAL_ENV': '/tmp/tmp.tWDX4Z0jlb'})

The environment variables related to Python that were set in the current process. It makes sense to pass these on with any python_command() invocation or other calls to the Python interpreter. This collection includes information about the Python interpreter, executable, PATH, and the virtual environment, if any, as well as any Python-related environment variables passed to this process. The special variable PYTHON_INTERPRETER will be passed into this environment. If it already exists in this process’ environment, it will be passed along as-is. If it does not exist in the current environment, it is created and made to point to the Python executable that was used to launch this process.

pycommons.processes.python.PYTHON_INTERPRETER: Final[Path] = '/tmp/tmp.tWDX4Z0jlb/bin/python3'

the Python interpreter used to launch this program

pycommons.processes.python.PYTHON_INTERPRETER_SHORT: Final[str] = 'python3'

The python interpreter in short form.

pycommons.processes.python.python_command(file, use_short_interpreter=True)[source]

Get a python command that could be used to interpret the given file.

This function tries to detect whether file identifies a Python module of an installed package, in which case it will issue a -m flag in the resulting command, or whether it is some other script, in which it will just return a normal interpreter invocation.

Notice that you should forward PYTHON_ENV as environment to the new Python process if it uses any packages. If we are currently running in a virtual environment, we want to tell this command about that.

Parameters:
  • file (str) – the python script

  • use_short_interpreter (bool, default: True) – use the short interpreter path, for reabability and maybe portablity, or the full path?

Return type:

list[str]

Returns:

a list that can be passed to the shell to run that program, see, e.g., pycommons.processes.shell.Command.

>>> python_command(os.__file__)
['python3', '-m', 'os']
>>> python_command(__file__)
['python3', '-m', 'pycommons.processes.python']
>>> from tempfile import mkstemp
>>> from os import remove as osremovex
>>> from os import close as osclosex
>>> h, p = mkstemp(text=True)
>>> osclosex(h)
>>> python_command(p) == [PYTHON_INTERPRETER_SHORT, p]
True
>>> python_command(p, False) == [PYTHON_INTERPRETER, p]
True
>>> osremovex(p)
>>> h, p = mkstemp(dir=file_path(__file__).up(), text=True)
>>> osclosex(h)
>>> python_command(p) == [PYTHON_INTERPRETER_SHORT, p]
True
>>> python_command(p, False) == [PYTHON_INTERPRETER, p]
True
>>> osremovex(p)
>>> the_pack = file_path(__file__).up()
>>> h, p = mkstemp(dir=the_pack,
...                suffix=".py", text=True)
>>> osclosex(h)
>>> the_str = p[len(the_pack.up(2)) + 1:-3].replace(os.sep, '.')
>>> python_command(p) == [PYTHON_INTERPRETER_SHORT, "-m", the_str]
True
>>> python_command(p, False) == [PYTHON_INTERPRETER, "-m", the_str]
True
>>> osremovex(p)

pycommons.processes.shell module

The tool for invoking shell commands.

>>> Command(("echo", "123"), stdout=STREAM_CAPTURE).execute(False)
('123\n', None)
class pycommons.processes.shell.Command(command, working_dir=None, timeout=3600, stdin=None, stdout=0, stderr=0, env=None)[source]

Bases: object

A class that represents a command that can be executed.

>>> c = Command("test")
>>> c.command
('test',)
>>> c.working_dir.is_dir()
True
>>> c.timeout
3600
>>> d = Command(("test", "b"))
>>> d.command
('test', 'b')
>>> d.working_dir == c.working_dir
True
>>> d.timeout == c.timeout
True
>>> e = Command(("", "test", " b", " "))
>>> e.command == d.command
True
>>> e.working_dir == c.working_dir
True
>>> e.timeout == c.timeout
True
>>> try:
...     Command(1)
... except TypeError as te:
...     print(str(te)[:50])
command should be an instance of any in {str, typi
>>> try:
...     Command([1])
... except TypeError as te:
...     print(te)
descriptor 'strip' for 'str' objects doesn't apply to a 'int' object
>>> try:
...     Command(["x", 1])
... except TypeError as te:
...     print(te)
descriptor 'strip' for 'str' objects doesn't apply to a 'int' object
>>> try:
...     Command([])
... except ValueError as ve:
...     print(ve)
Invalid command [].
>>> try:
...     Command([""])
... except ValueError as ve:
...     print(ve)
Invalid command [''].
>>> try:
...     Command("")
... except ValueError as ve:
...     print(ve)
Invalid command [''].
>>> Command("x", working_dir=Path(__file__).up(1)).command
('x',)
>>> try:
...     Command("x", working_dir=1)
... except TypeError as te:
...     print(te)
descriptor '__len__' requires a 'str' object but received a 'int'
>>> try:
...     Command("x", working_dir=Path(__file__))
... except ValueError as ve:
...     print(str(ve)[-30:])
does not identify a directory.
>>> Command("x", timeout=23).timeout
23
>>> try:
...     Command("x", timeout=1.2)
... except TypeError as te:
...     print(te)
timeout should be an instance of int but is float, namely 1.2.
>>> try:
...     Command("x", timeout=None)
... except TypeError as te:
...     print(te)
timeout should be an instance of int but is None.
>>> try:
...     Command("x", timeout=0)
... except ValueError as ve:
...     print(ve)
timeout=0 is invalid, must be in 1..1000000.
>>> try:
...     Command("x", timeout=1_000_001)
... except ValueError as ve:
...     print(ve)
timeout=1000001 is invalid, must be in 1..1000000.
>>> try:
...     Command("x", stdin=1_000_001)
... except TypeError as te:
...     print(str(te)[:49])
stdin should be an instance of any in {None, str}
>>> sxx = str(Command("x", env={"A": "B", "C": "D"}))
>>> sxx[sxx.index("with "):sxx.index("with ") + 30]
'with <env> no stdin, stdout ig'
>>> try:
...     Command("x", env={"A": "B", "C": 1})
... except TypeError as te:
...     print(str(te))
descriptor 'strip' for 'str' objects doesn't apply to a 'int' object
>>> try:
...     Command("x", env=1)
... except TypeError as te:
...     print(str(te)[:-20])
env should be an instance of any in {typing.Iterable, typing.Mapping} b
>>> str(Command("x", env=dict()))[0:10]
"('x',) in "
command: tuple[str, ...]

the command line.

env: tuple[tuple[str, str], ...] | None

the environment variables to pass to the new process, if any

execute(log_call=True)[source]

Execute the given process.

Parameters:

log_call (bool, default: True) – should the call be logged? If True, the string representation of the Command will be written to the logger, otherwise nothing is logged. Note: The environment, if any, will not be printed for security reasons.

Return type:

tuple[str | None, str | None]

Returns:

a tuple with the standard output and standard error, which are only not None if they were supposed to be captured

Raises:
  • TypeError – if any argument has the wrong type

  • ValueError – if execution of the process failed

>>> Command(("echo", "123"), stdout=STREAM_CAPTURE).execute(False)
('123\n', None)
>>> Command(("echo", "", "123"), stdout=STREAM_CAPTURE).execute(False)
('123\n', None)
>>> from contextlib import redirect_stdout
>>> with redirect_stdout(None):
...     s = Command(("echo", "123"), stdout=STREAM_CAPTURE).execute()
>>> print(s)
('123\n', None)
>>> Command("cat", stdin="test", stdout=STREAM_CAPTURE).execute(False)
('test', None)
>>> Command("cat", stdin="test").execute(False)
(None, None)
>>> try:
...     with redirect_stdout(None):
...         Command(("ping", "blabla!")).execute(True)
... except ValueError as ve:
...     ss = str(ve)
...     print(ss[:20] + " ... " + ss[-22:])
('ping', 'blabla!')  ...  yields return code 2.
>>> try:
...     with redirect_stdout(None):
...         Command(("ping", "www.example.com", "-i 20"),
...                 timeout=1).execute(True)
... except ValueError as ve:
...     print("timed out after" in str(ve))
True
>>> try:
...     Command("x").execute(None)
... except TypeError as te:
...     print(te)
log_call should be an instance of bool but is None.
>>> try:
...     Command("x").execute(1)
... except TypeError as te:
...     print(te)
log_call should be an instance of bool but is int, namely 1.
>>> with redirect_stdout(None):
...     r = Command(("echo", "1"), stderr=STREAM_CAPTURE).execute(
...             True)
>>> r
(None, '')
>>> with redirect_stdout(None):
...     r = Command(("printenv", ),
...                 stdout=STREAM_CAPTURE,
...                 env={"BLA": "XX"}).execute(True)
>>> r
('BLA=XX\n', None)
stderr: int

how to handle the standard error stream

stdin: str | None

the data to be written to stdin

stdout: int

how to handle the standard output stream

timeout: int

the timeout in seconds, after which the process will be terminated

working_dir: Path

the working directory

pycommons.processes.shell.STREAM_CAPTURE: Final[int] = 2

capture the given stream

pycommons.processes.shell.STREAM_FORWARD: Final[int] = 1

forward given stream to the same stream of this process

pycommons.processes.shell.STREAM_IGNORE: Final[int] = 0

ignore the given stream

pycommons.processes.shell_to_file module

A tool for invoking shell commands and piping their output to files.

To do this more or less safely and reliably, we create a script that invokes the original command. The script is created as temporary file and will be deleted after the command completes.

>>> from pycommons.io.temp import temp_dir
>>> cmd = Command(("echo", "123"))
>>> with temp_dir() as td:
...     so = td.resolve_inside("so.txt")
...     se = td.resolve_inside("se.txt")
...     to_files(cmd, so, se).execute()
...     print(f"so: {so.read_all_str()}")
(None, None)
so: 123
>>> with temp_dir() as td:
...     so = td.resolve_inside("so.txt")
...     to_files(cmd, so, so).execute()
...     print(f"so: {so.read_all_str()}")
(None, None)
so: 123
>>> try:
...     to_files("a", "a", "b")
... except TypeError as te:
...     print(str(te)[:10])
command sh
>>> try:
...     to_files(cmd, 1, "b")
... except TypeError as te:
...     print(te)
stdout should be an instance of any in {None, str} but is int, namely 1.
>>> try:
...     to_files(cmd, "a", 1)
... except TypeError as te:
...     print(te)
stderr should be an instance of any in {None, str} but is int, namely 1.
>>> try:
...     to_files(cmd, None, None)
... except ValueError as ve:
...     print(ve)
Either stdout or stderr must be specified.
pycommons.processes.shell_to_file.to_files(command, stdout, stderr)[source]

Take an existing command and forward its stdout and/or stderr to files.

Currently, providing text as standard input is not supported. You can provide either different or the same file for the standard output and standard error. If the same file is provided, then both streams will be merged into that file. Either way, the files you provide will be created and overwritten during the command execution. Notice that whatever original settings for standard error and standard output you provided in the original Command instance command will be ignored.

Parameters:
  • command (Command) – the command

  • stdout (str | None) – the file to capture the stdout, or None if stdout should be ignored

  • stderr (str | None) – the file to capture the stderr, or None if stderr should be ignored

Return type:

Command

Returns:

the new command

pycommons.processes.system_state module

Functionality to log the current system state.

Here we provide a small program that can be executed concurrently with other activities and that logs information about the system state. This may be useful when running some computationally heavy experiments to find potential problems.

pycommons.processes.system_state.collect_system_state(collector)[source]

Get a single string with the current state of the system.

Parameters:

collector (Callable[[str, str], Any]) – the collector to receive the key-value tuples

Return type:

None

>>> def __ptr(a: str, b: str) -> None:
...     pass
>>> s = collect_system_state(__ptr)
>>> try:
...     collect_system_state(None)
... except TypeError as te:
...     print(te)
collector should be a callable but is None.
pycommons.processes.system_state.log_system_state(interval_seconds=300, should_stop=<function <lambda>>, lock=<contextlib.nullcontext object>)[source]

Log the system state periodically to the stdout.

This function allows for periodic logging of the system state to the standard output. This can be launched as a program running besides an experiment in order to help tracking potential problems. Let’s say that your experiment or whatever program crashes for unclear reasons. Why did it crash? We don’t know. Maybe it crashed because it ran out of memory. Maybe it ran out of disk space? Maybe not? Who knows. If you let this function here run concurrently to your program and pipe its output to a log file, then at least you will be able to see if the system slowly runs out of memory, disk space, or if the CPU gets too hot, or something. Or, at least, you can rule out that this is not the case.

The output is presented in CSV format. Therefore, you can pipe it to a file and later open it in Excel or whatever. This allows you to draw diagrams of the usage of CPUs and memory or the temperature of the CPU over time.

Parameters:
  • interval_seconds (int, default: 300) – the interval seconds

  • should_stop (Callable[[], bool], default: <function <lambda> at 0x7fc2582b3420>) – a function telling the logger when it should stop

  • lock (AbstractContextManager, default: <contextlib.nullcontext object at 0x7fc24ada88f0>) – a shared lock for the console access

Return type:

None

# Example:

>>> from contextlib import redirect_stdout
>>> from io import StringIO
>>> sio = StringIO()
>>> def __three(lst=[1, 2, 3, 4, 5, 6]) -> bool:
...     if list.__len__(lst) > 0:
...         del lst[-1]
...         return False
...     return True
>>> with redirect_stdout(sio):
...     log_system_state(1, __three)
>>> v = sio.getvalue().splitlines()
>>> len(v)
4
>>> v[0][:20]
'now;now.year;now.mon'
>>> i = list.__len__(v[0].split(CSV_SEPARATOR))
>>> all(list.__len__(vv.split(CSV_SEPARATOR)) == i for vv in v)
True
>>> try:
...     log_system_state(1, lock=None)
... except TypeError as te:
...     print(str(te)[0:60])
lock should be an instance of contextlib.AbstractContextMana
>>> try:
...     log_system_state(1, should_stop=None)
... except TypeError as te:
...     print(te)
should_stop should be a callable but is None.