Coverage for pycommons / processes / multishell.py: 98%

46 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-24 03:11 +0000

1r""" 

2A tool for running multiple commands in parallel. 

3 

4>>> from pycommons.processes.shell import STREAM_CAPTURE 

5>>> c1 = Command(("echo", "123"), stdout=STREAM_CAPTURE) 

6>>> c2 = Command(("echo", "abc"), stdout=STREAM_CAPTURE) 

7 

8>>> multi_execute((c1, c2), True) 

9(('123\n', None), ('abc\n', None)) 

10 

11>>> multi_execute((c1, ), False) 

12(('123\n', None),) 

13 

14>>> multi_execute((c1, c2, c2, c2), True) 

15(('123\n', None), ('abc\n', None), ('abc\n', None), ('abc\n', None)) 

16""" 

17 

18from threading import Thread 

19from typing import Any, Final, Iterable 

20 

21from pycommons.io.console import logger 

22from pycommons.processes.shell import Command 

23from pycommons.types import type_error 

24 

25 

26def __exec(command: Command, idx: int, 

27 out: list[tuple[str | None, str | None]], 

28 log_call: bool) -> None: 

29 """ 

30 Perform the actual execution of a command. 

31 

32 :param command: the command to execute 

33 :param idx: the index of the command to execute 

34 :param out: the output list to receive the command output 

35 :param log_call: shall the call be logged? 

36 """ 

37 out[idx] = command.execute(log_call=log_call) 

38 

39 

40def multi_execute(commands: Iterable[Command], log: bool = True) \ 

41 -> tuple[tuple[str | None, str | None], ...]: 

42 r""" 

43 Execute multiple commands in parallel. 

44 

45 :param commands: the iterable of the commands to execute 

46 :param log: shall the execution state be logged? 

47 :returns: the results of the commands 

48 

49 >>> from pycommons.processes.shell import STREAM_CAPTURE 

50 >>> c1 = Command(("echo", "123"), stdout=STREAM_CAPTURE) 

51 >>> c2 = Command(("echo", "abc"), stdout=STREAM_CAPTURE) 

52 

53 >>> multi_execute((), False) 

54 () 

55 >>> multi_execute((), True) 

56 () 

57 

58 >>> multi_execute((c1, ), False) 

59 (('123\n', None),) 

60 >>> multi_execute((c1, ), True) 

61 (('123\n', None),) 

62 

63 >>> multi_execute((c1, c2), False) 

64 (('123\n', None), ('abc\n', None)) 

65 >>> multi_execute((c1, c2), True) 

66 (('123\n', None), ('abc\n', None)) 

67 

68 >>> multi_execute((c1, c2, c2, c2), True) 

69 (('123\n', None), ('abc\n', None), ('abc\n', None), ('abc\n', None)) 

70 >>> multi_execute((c1, c2, c2, c2), False) 

71 (('123\n', None), ('abc\n', None), ('abc\n', None), ('abc\n', None)) 

72 

73 >>> try: 

74 ... multi_execute(1) 

75 ... except TypeError as te: 

76 ... print(te) 

77 commands should be an instance of typing.Iterable but is int, namely 1. 

78 

79 >>> try: 

80 ... multi_execute((c1, c2), 3) 

81 ... except TypeError as te: 

82 ... print(te) 

83 log should be an instance of bool but is int, namely 3. 

84 

85 >>> try: 

86 ... multi_execute(("x", )) 

87 ... except TypeError as te: 

88 ... print(str(te)[:20]) 

89 commands[0] should b 

90 """ 

91 if not isinstance(commands, Iterable): 

92 raise type_error(commands, "commands", Iterable) 

93 if not isinstance(log, bool): 

94 raise type_error(log, "log", bool) 

95 

96 threads: Final[list[Thread]] = [] 

97 out: list[tuple[str | None, str | None]] = [] 

98 kwargs: Final[dict[str, Any]] = {"log_call": log, "out": out} 

99 command: Command | None = None 

100 for idx, command in enumerate(commands): 

101 if not isinstance(command, Command): 

102 raise type_error(command, f"commands[{idx}]", Command) 

103 kw = dict(kwargs) 

104 kw["command"] = command 

105 kw["idx"] = idx 

106 out.append((None, None)) 

107 threads.append(Thread(target=__exec, kwargs=kw)) 

108 

109 llen: Final[int] = list.__len__(threads) 

110 if llen <= 0: 

111 if log: 

112 logger("No command to execute, quitting.") 

113 return () 

114 if llen <= 1: 

115 if log: 

116 logger("Only one command, not using threads.") 

117 if command is None: 

118 raise ValueError("Huh?") 

119 return (command.execute(log_call=False), ) 

120 

121 if log: 

122 logger(f"Executing {llen} processes by using threads.") 

123 for thread in threads: 

124 thread.start() 

125 if log: 

126 logger(f"All {llen} processes have started.") 

127 for thread in threads: 

128 thread.join() 

129 if log: 

130 logger(f"All {llen} processes have completed.") 

131 return tuple(out)