Coverage for pycommons / processes / fork.py: 34%

86 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-24 03:11 +0000

1""" 

2Fork a Python interpreter process multiple times. 

3 

4The use case for this tool is when you have a main script in your Python 

5program and want to allow a user to launch it multiple times with the sme 

6parameters. 

7In this case, she would simply provide the argument `--fork NNN`, where 

8`NNN` is either the absolute number of times the program should be launched 

9or a fraction in (0, 1) of the available logical cores to use. 

10 

11You would use the normal :class:`~argparse.ArgumentParser` in your main 

12code piece, i.e., do something like 

13`if __name__ == "__main__":` followed by 

14` parser: Final[ArgumentParser] = ...`. 

15 

16Then, you would invoke `args = fork(parser)`, where :func:`fork` is the 

17function provided here. 

18This function will return `None` if it has detected the `--fork` argument. 

19In that case, it will have launched the process again the appropriate 

20number of times and waited for its completion already. 

21If it did not detect the `--fork` argument, it will return the 

22:class:`~argparse.Namespace` instance with the normal arguments. 

23Actually, it will do exactly this in the launched copies of the process. 

24 

25So from the outside, this looks pretty much like the good old `fork` 

26command on Unix systems. 

27However, it can do a little bit more. 

28For each process copy (except the original process), the `args.fork_id` 

29will hold a unique ID of the forked process. 

30 

31Also, you can provide the command `--fork-log-dir YYY` to the original 

32process. In this case, each of the forked processes will not be launched 

33normally. Instead, its stdout and stderr will be piped into two different 

34files inside the directory `YYY`. This makes working with experiments, like 

35in the *moptipy* framework (see <https://thomasweise.github.io/moptipy>) 

36easier. 

37 

38The gist is this: Under the hood, this looks as if you can do Unix-style 

39forks of the Python interpreter. Actually, it launches separate interpreter 

40processes with the same arguments (plus the `fork_id` parameter and minus 

41the forking-arguments). 

42 

43Side note: If you do not provide a logging directory and the number of 

44processes to launch would be 1, then no forking takes place. This then 

45just returns the normal arguments as if no forking parameters were provided 

46at all. 

47""" 

48import sys 

49from argparse import ArgumentParser, Namespace 

50from math import isfinite 

51from os import cpu_count, environ, getpid 

52from platform import node 

53from typing import Final 

54 

55from pycommons.io.console import logger 

56from pycommons.io.path import Path 

57from pycommons.processes.multishell import multi_execute 

58from pycommons.processes.python import PYTHON_ENV, PYTHON_INTERPRETER 

59from pycommons.processes.shell import STREAM_FORWARD, Command 

60from pycommons.processes.shell_to_file import to_files 

61from pycommons.strings.string_tools import replace_str 

62from pycommons.types import type_error 

63 

64 

65def get_cores(use: int | float, n_cpu: int | None = None) -> int | None: 

66 """ 

67 Compute the number of CPU cores to be used (for forking). 

68 

69 :param use: the usage number, either a float between 0 and 1 denoting a 

70 fraction of cores to be used, or the absolute number. 

71 :param n_cpu: the number of CPU cores available, or `None` if we should 

72 determine it automatically. 

73 :return: the number of cores 

74 

75 >>> get_cores(1) 

76 1 

77 >>> get_cores(2) 

78 2 

79 >>> get_cores(2.3) 

80 2 

81 >>> get_cores(2.6) 

82 3 

83 >>> get_cores(0.5, 10) 

84 5 

85 >>> get_cores(0.3, 10) 

86 3 

87 >>> get_cores(0.5, 16) 

88 8 

89 >>> get_cores(0.5, 1) 

90 1 

91 

92 >>> 0 < get_cores(0.5) < 10000 

93 True 

94 

95 >>> try: 

96 ... get_cores("a") 

97 ... except TypeError as te: 

98 ... print(te) 

99 use should be an instance of any in {float, int} but is str, namely 'a'. 

100 

101 >>> try: 

102 ... get_cores(0.3, "a") 

103 ... except TypeError as te: 

104 ... print(te) 

105 n_cpu should be an instance of int but is str, namely 'a'. 

106 

107 >>> try: 

108 ... get_cores(-1) 

109 ... except ValueError as v: 

110 ... print(v) 

111 Invalid value -1 for number of cores to use. 

112 """ 

113 if not isinstance(use, int | float): 

114 raise type_error(use, "use", (int, float)) 

115 if (not isfinite(use)) or (use <= 0) or (use > 1000): 

116 raise ValueError(f"Invalid value {use!r} for number of cores to use.") 

117 if use < 1: 

118 if n_cpu is None: 

119 n_cpu = cpu_count() 

120 if n_cpu is None: 

121 return 1 

122 if not isinstance(n_cpu, int): 

123 raise type_error(n_cpu, "n_cpu", int) 

124 if n_cpu < 1: 

125 raise ValueError(f"Invalid value {n_cpu!r} for n_cpu.") 

126 return max(1, min(n_cpu - 1, round(use * n_cpu))) 

127 return round(use) 

128 

129 

130def fork(parser: ArgumentParser) -> Namespace | None: 

131 """ 

132 Launch this Python process multiple times if requested to. 

133 

134 If the user provided an argument `--fork NNN`, where `NNN` is either an 

135 absolute number of processes to launch or a fraction in (0, 1) of logical 

136 CPU cores to use, then this function will invoke the interpreter the 

137 corresponding number of times with exactly the same command line arguments 

138 except the forking parameters. 

139 

140 You can provide an argument `--fork-log-dir DDD`, where `DDD` is a 

141 directory. If you do this, then the stdout and stderr of each launched 

142 process are piped into files inside this directory. 

143 

144 If forking is done, then each forked process gets an additional argument 

145 `fork_id` with a unique identifier. 

146 

147 If no forking arguments are provided, of if we would fork just 1 process 

148 without logging directory, then no forking is done. 

149 In this case, this routine just returns the :class:`~argparse.Namespace` 

150 instance with the command line arguments. 

151 If this actually already is a forked process, then, too, the 

152 :class:`~argparse.Namespace` instance with the arguments (plus the 

153 `fork_id`) is returned. 

154 If this is the root process from which the forks were started, then `None` 

155 is returned. In that case, the function returns after all sub-processes 

156 are completed. 

157 

158 :param parser: the root argument parser 

159 :return: `None` if the argument parser contained forking arguments and 

160 the same process was forked multiple times, otherwise the 

161 :class:`~argparse.Namespace` with the arguments. 

162 """ 

163 if not isinstance(parser, ArgumentParser): 

164 raise type_error(parser, "parser", ArgumentParser) 

165 arg_fork: Final[str] = "--fork" 

166 arg_log_dir: Final[str] = f"{arg_fork}-log-dir" 

167 arg_fork_id: Final[str] = f"{arg_fork}-id" 

168 parser.add_argument( 

169 arg_fork, nargs="?", type=float, 

170 help=("invoke this process multiple times in parallel. --fork can " 

171 "either be an absolute number of processes to launch or a " 

172 "fraction in (0, 1) of the available logical CPU cores.")) 

173 parser.add_argument(arg_log_dir, nargs="?", type=Path) 

174 parser.add_argument(arg_fork_id, nargs="?", type=int) 

175 args: Namespace = parser.parse_args() 

176 

177 if args.fork is None: 

178 return args 

179 if args.fork_id is not None: 

180 raise ValueError("Cannot recursively fork.") 

181 

182 # Get number of processes. 

183 try: 

184 nfork: int = get_cores(args.fork) 

185 except ValueError as ve: 

186 raise ValueError(f"Invalid value {args.fork} for {arg_fork}.") from ve 

187 

188 ld: Path | None = args.fork_log_dir 

189 if nfork <= 1: 

190 if ld is None: 

191 logger("No need to fork: There would be just 1 process " 

192 "and no log directory.") 

193 return args 

194 nfork = 1 

195 

196 msg: str = f"Forking {nfork} processes" 

197 if ld is None: 

198 msg = f"{msg} without specific log directory." 

199 else: 

200 msg = f"{msg} with log directory {ld!r}." 

201 logger(msg) 

202 

203 # creating and cleaning up arguments 

204 fork_args: list[str] = list(sys.argv) 

205 i: int = fork_args.index(arg_fork) 

206 del fork_args[i] 

207 del fork_args[i] 

208 if ld is not None: 

209 i = fork_args.index(arg_log_dir) 

210 del fork_args[i] 

211 del fork_args[i] 

212 fork_args.insert(0, PYTHON_INTERPRETER) 

213 fork_args.append(arg_fork_id) 

214 

215 use_env: Final[dict[str, str]] = dict(environ) 

216 use_env.update(PYTHON_ENV) 

217 

218 commands: Final[list[Command]] = [] 

219 

220 for i in range(nfork): 

221 fork_args.append(str(i)) 

222 commands.append(Command( 

223 command=fork_args, 

224 stdout=STREAM_FORWARD, 

225 stderr=STREAM_FORWARD, 

226 env=use_env)) 

227 del fork_args[-1] 

228 

229 if ld is not None: 

230 ld.ensure_dir_exists() 

231 prefix = str.lower(f"{node()}_{getpid()}_fork_") 

232 for ch in "- \\/\t.+*%!#": 

233 prefix = str.replace(prefix, ch, "_") 

234 prefix = replace_str("__", "_", prefix) 

235 for i, cmd in enumerate(commands): 

236 commands[i] = to_files( 

237 command=cmd, 

238 stdout=ld.resolve_inside(f"{prefix}{i}_stdout.txt"), 

239 stderr=ld.resolve_inside(f"{prefix}{i}_stderr.txt")) 

240 

241 multi_execute(commands, True) 

242 return None