Coverage for pycommons / processes / fork.py: 34%
86 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-24 03:11 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-24 03:11 +0000
1"""
2Fork a Python interpreter process multiple times.
4The use case for this tool is when you have a main script in your Python
5program and want to allow a user to launch it multiple times with the sme
6parameters.
7In this case, she would simply provide the argument `--fork NNN`, where
8`NNN` is either the absolute number of times the program should be launched
9or a fraction in (0, 1) of the available logical cores to use.
11You would use the normal :class:`~argparse.ArgumentParser` in your main
12code piece, i.e., do something like
13`if __name__ == "__main__":` followed by
14` parser: Final[ArgumentParser] = ...`.
16Then, you would invoke `args = fork(parser)`, where :func:`fork` is the
17function provided here.
18This function will return `None` if it has detected the `--fork` argument.
19In that case, it will have launched the process again the appropriate
20number of times and waited for its completion already.
21If it did not detect the `--fork` argument, it will return the
22:class:`~argparse.Namespace` instance with the normal arguments.
23Actually, it will do exactly this in the launched copies of the process.
25So from the outside, this looks pretty much like the good old `fork`
26command on Unix systems.
27However, it can do a little bit more.
28For each process copy (except the original process), the `args.fork_id`
29will hold a unique ID of the forked process.
31Also, you can provide the command `--fork-log-dir YYY` to the original
32process. In this case, each of the forked processes will not be launched
33normally. Instead, its stdout and stderr will be piped into two different
34files inside the directory `YYY`. This makes working with experiments, like
35in the *moptipy* framework (see <https://thomasweise.github.io/moptipy>)
36easier.
38The gist is this: Under the hood, this looks as if you can do Unix-style
39forks of the Python interpreter. Actually, it launches separate interpreter
40processes with the same arguments (plus the `fork_id` parameter and minus
41the forking-arguments).
43Side note: If you do not provide a logging directory and the number of
44processes to launch would be 1, then no forking takes place. This then
45just returns the normal arguments as if no forking parameters were provided
46at all.
47"""
48import sys
49from argparse import ArgumentParser, Namespace
50from math import isfinite
51from os import cpu_count, environ, getpid
52from platform import node
53from typing import Final
55from pycommons.io.console import logger
56from pycommons.io.path import Path
57from pycommons.processes.multishell import multi_execute
58from pycommons.processes.python import PYTHON_ENV, PYTHON_INTERPRETER
59from pycommons.processes.shell import STREAM_FORWARD, Command
60from pycommons.processes.shell_to_file import to_files
61from pycommons.strings.string_tools import replace_str
62from pycommons.types import type_error
65def get_cores(use: int | float, n_cpu: int | None = None) -> int | None:
66 """
67 Compute the number of CPU cores to be used (for forking).
69 :param use: the usage number, either a float between 0 and 1 denoting a
70 fraction of cores to be used, or the absolute number.
71 :param n_cpu: the number of CPU cores available, or `None` if we should
72 determine it automatically.
73 :return: the number of cores
75 >>> get_cores(1)
76 1
77 >>> get_cores(2)
78 2
79 >>> get_cores(2.3)
80 2
81 >>> get_cores(2.6)
82 3
83 >>> get_cores(0.5, 10)
84 5
85 >>> get_cores(0.3, 10)
86 3
87 >>> get_cores(0.5, 16)
88 8
89 >>> get_cores(0.5, 1)
90 1
92 >>> 0 < get_cores(0.5) < 10000
93 True
95 >>> try:
96 ... get_cores("a")
97 ... except TypeError as te:
98 ... print(te)
99 use should be an instance of any in {float, int} but is str, namely 'a'.
101 >>> try:
102 ... get_cores(0.3, "a")
103 ... except TypeError as te:
104 ... print(te)
105 n_cpu should be an instance of int but is str, namely 'a'.
107 >>> try:
108 ... get_cores(-1)
109 ... except ValueError as v:
110 ... print(v)
111 Invalid value -1 for number of cores to use.
112 """
113 if not isinstance(use, int | float):
114 raise type_error(use, "use", (int, float))
115 if (not isfinite(use)) or (use <= 0) or (use > 1000):
116 raise ValueError(f"Invalid value {use!r} for number of cores to use.")
117 if use < 1:
118 if n_cpu is None:
119 n_cpu = cpu_count()
120 if n_cpu is None:
121 return 1
122 if not isinstance(n_cpu, int):
123 raise type_error(n_cpu, "n_cpu", int)
124 if n_cpu < 1:
125 raise ValueError(f"Invalid value {n_cpu!r} for n_cpu.")
126 return max(1, min(n_cpu - 1, round(use * n_cpu)))
127 return round(use)
130def fork(parser: ArgumentParser) -> Namespace | None:
131 """
132 Launch this Python process multiple times if requested to.
134 If the user provided an argument `--fork NNN`, where `NNN` is either an
135 absolute number of processes to launch or a fraction in (0, 1) of logical
136 CPU cores to use, then this function will invoke the interpreter the
137 corresponding number of times with exactly the same command line arguments
138 except the forking parameters.
140 You can provide an argument `--fork-log-dir DDD`, where `DDD` is a
141 directory. If you do this, then the stdout and stderr of each launched
142 process are piped into files inside this directory.
144 If forking is done, then each forked process gets an additional argument
145 `fork_id` with a unique identifier.
147 If no forking arguments are provided, of if we would fork just 1 process
148 without logging directory, then no forking is done.
149 In this case, this routine just returns the :class:`~argparse.Namespace`
150 instance with the command line arguments.
151 If this actually already is a forked process, then, too, the
152 :class:`~argparse.Namespace` instance with the arguments (plus the
153 `fork_id`) is returned.
154 If this is the root process from which the forks were started, then `None`
155 is returned. In that case, the function returns after all sub-processes
156 are completed.
158 :param parser: the root argument parser
159 :return: `None` if the argument parser contained forking arguments and
160 the same process was forked multiple times, otherwise the
161 :class:`~argparse.Namespace` with the arguments.
162 """
163 if not isinstance(parser, ArgumentParser):
164 raise type_error(parser, "parser", ArgumentParser)
165 arg_fork: Final[str] = "--fork"
166 arg_log_dir: Final[str] = f"{arg_fork}-log-dir"
167 arg_fork_id: Final[str] = f"{arg_fork}-id"
168 parser.add_argument(
169 arg_fork, nargs="?", type=float,
170 help=("invoke this process multiple times in parallel. --fork can "
171 "either be an absolute number of processes to launch or a "
172 "fraction in (0, 1) of the available logical CPU cores."))
173 parser.add_argument(arg_log_dir, nargs="?", type=Path)
174 parser.add_argument(arg_fork_id, nargs="?", type=int)
175 args: Namespace = parser.parse_args()
177 if args.fork is None:
178 return args
179 if args.fork_id is not None:
180 raise ValueError("Cannot recursively fork.")
182 # Get number of processes.
183 try:
184 nfork: int = get_cores(args.fork)
185 except ValueError as ve:
186 raise ValueError(f"Invalid value {args.fork} for {arg_fork}.") from ve
188 ld: Path | None = args.fork_log_dir
189 if nfork <= 1:
190 if ld is None:
191 logger("No need to fork: There would be just 1 process "
192 "and no log directory.")
193 return args
194 nfork = 1
196 msg: str = f"Forking {nfork} processes"
197 if ld is None:
198 msg = f"{msg} without specific log directory."
199 else:
200 msg = f"{msg} with log directory {ld!r}."
201 logger(msg)
203 # creating and cleaning up arguments
204 fork_args: list[str] = list(sys.argv)
205 i: int = fork_args.index(arg_fork)
206 del fork_args[i]
207 del fork_args[i]
208 if ld is not None:
209 i = fork_args.index(arg_log_dir)
210 del fork_args[i]
211 del fork_args[i]
212 fork_args.insert(0, PYTHON_INTERPRETER)
213 fork_args.append(arg_fork_id)
215 use_env: Final[dict[str, str]] = dict(environ)
216 use_env.update(PYTHON_ENV)
218 commands: Final[list[Command]] = []
220 for i in range(nfork):
221 fork_args.append(str(i))
222 commands.append(Command(
223 command=fork_args,
224 stdout=STREAM_FORWARD,
225 stderr=STREAM_FORWARD,
226 env=use_env))
227 del fork_args[-1]
229 if ld is not None:
230 ld.ensure_dir_exists()
231 prefix = str.lower(f"{node()}_{getpid()}_fork_")
232 for ch in "- \\/\t.+*%!#":
233 prefix = str.replace(prefix, ch, "_")
234 prefix = replace_str("__", "_", prefix)
235 for i, cmd in enumerate(commands):
236 commands[i] = to_files(
237 command=cmd,
238 stdout=ld.resolve_inside(f"{prefix}{i}_stdout.txt"),
239 stderr=ld.resolve_inside(f"{prefix}{i}_stderr.txt"))
241 multi_execute(commands, True)
242 return None