Coverage for pycommons / processes / shell.py: 98%

89 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-11 03:04 +0000

1"""The tool for invoking shell commands.""" 

2 

3import subprocess # nosec 

4from dataclasses import dataclass 

5from os import getcwd 

6from typing import Callable, Final, Iterable, Mapping 

7 

8from pycommons.io.console import logger 

9from pycommons.io.path import UTF8, Path, directory_path 

10from pycommons.types import check_int_range, type_error 

11 

12#: ignore the given stream 

13STREAM_IGNORE: Final[int] = 0 

14#: forward given stream to the same stream of this process 

15STREAM_FORWARD: Final[int] = 1 

16#: capture the given stream 

17STREAM_CAPTURE: Final[int] = 2 

18 

19 

20#: the stream mode to string converter 

21_SM: Final[Callable[[int], str]] = { 

22 STREAM_IGNORE: " ignored", 

23 STREAM_FORWARD: " forwarded", 

24 STREAM_CAPTURE: " captured", 

25}.get 

26 

27 

28@dataclass(frozen=True, init=False, order=False, eq=False) 

29class Command: 

30 """ 

31 A class that represents a command that can be executed. 

32 

33 >>> c = Command("test") 

34 >>> c.command 

35 ('test',) 

36 >>> c.working_dir.is_dir() 

37 True 

38 >>> c.timeout 

39 3600 

40 

41 >>> d = Command(("test", "b")) 

42 >>> d.command 

43 ('test', 'b') 

44 >>> d.working_dir == c.working_dir 

45 True 

46 >>> d.timeout == c.timeout 

47 True 

48 

49 >>> e = Command(("", "test", " b", " ")) 

50 >>> e.command == d.command 

51 True 

52 >>> e.working_dir == c.working_dir 

53 True 

54 >>> e.timeout == c.timeout 

55 True 

56 

57 >>> try: 

58 ... Command(1) 

59 ... except TypeError as te: 

60 ... print(str(te)[:50]) 

61 command should be an instance of any in {str, typi 

62 

63 >>> try: 

64 ... Command([1]) 

65 ... except TypeError as te: 

66 ... print(te) 

67 descriptor 'strip' for 'str' objects doesn't apply to a 'int' object 

68 

69 >>> try: 

70 ... Command(["x", 1]) 

71 ... except TypeError as te: 

72 ... print(te) 

73 descriptor 'strip' for 'str' objects doesn't apply to a 'int' object 

74 

75 >>> try: 

76 ... Command([]) 

77 ... except ValueError as ve: 

78 ... print(ve) 

79 Invalid command []. 

80 

81 >>> try: 

82 ... Command([""]) 

83 ... except ValueError as ve: 

84 ... print(ve) 

85 Invalid command ['']. 

86 

87 >>> try: 

88 ... Command("") 

89 ... except ValueError as ve: 

90 ... print(ve) 

91 Invalid command ['']. 

92 

93 >>> Command("x", working_dir=Path(__file__).up(1)).command 

94 ('x',) 

95 

96 >>> try: 

97 ... Command("x", working_dir=1) 

98 ... except TypeError as te: 

99 ... print(te) 

100 descriptor '__len__' requires a 'str' object but received a 'int' 

101 

102 >>> try: 

103 ... Command("x", working_dir=Path(__file__)) 

104 ... except ValueError as ve: 

105 ... print(str(ve)[-30:]) 

106 does not identify a directory. 

107 

108 >>> Command("x", timeout=23).timeout 

109 23 

110 

111 >>> try: 

112 ... Command("x", timeout=1.2) 

113 ... except TypeError as te: 

114 ... print(te) 

115 timeout should be an instance of int but is float, namely 1.2. 

116 

117 >>> try: 

118 ... Command("x", timeout=None) 

119 ... except TypeError as te: 

120 ... print(te) 

121 timeout should be an instance of int but is None. 

122 

123 >>> try: 

124 ... Command("x", timeout=0) 

125 ... except ValueError as ve: 

126 ... print(ve) 

127 timeout=0 is invalid, must be in 1..1000000. 

128 

129 >>> try: 

130 ... Command("x", timeout=1_000_001) 

131 ... except ValueError as ve: 

132 ... print(ve) 

133 timeout=1000001 is invalid, must be in 1..1000000. 

134 

135 >>> try: 

136 ... Command("x", stdin=1_000_001) 

137 ... except TypeError as te: 

138 ... print(str(te)[:49]) 

139 stdin should be an instance of any in {None, str} 

140 

141 >>> sxx = str(Command("x", env={"A": "B", "C": "D"})) 

142 >>> sxx[sxx.index("with "):sxx.index("with ") + 30] 

143 'with <env> no stdin, stdout ig' 

144 

145 >>> try: 

146 ... Command("x", env={"A": "B", "C": 1}) 

147 ... except TypeError as te: 

148 ... print(str(te)) 

149 descriptor 'strip' for 'str' objects doesn't apply to a 'int' object 

150 

151 >>> try: 

152 ... Command("x", env=1) 

153 ... except TypeError as te: 

154 ... print(str(te)) 

155 env should be an instance of typing.Mapping but is int, namely 1. 

156 

157 >>> str(Command("x", env=dict()))[0:10] 

158 "('x',) in " 

159 """ 

160 

161 #: the command line. 

162 command: tuple[str, ...] 

163 #: the working directory 

164 working_dir: Path 

165 #: the timeout in seconds, after which the process will be terminated 

166 timeout: int 

167 #: the data to be written to stdin 

168 stdin: str | None 

169 #: how to handle the standard output stream 

170 stdout: int 

171 #: how to handle the standard error stream 

172 stderr: int 

173 #: the environment variables to pass to the new process, if any 

174 env: tuple[tuple[str, str], ...] | None 

175 

176 def __init__(self, command: str | Iterable[str], 

177 working_dir: str | None = None, 

178 timeout: int | None = 3600, 

179 stdin: str | None = None, 

180 stdout: int = STREAM_IGNORE, 

181 stderr: int = STREAM_IGNORE, 

182 env: Mapping[str, str] | None = None) -> None: 

183 """ 

184 Create the command. 

185 

186 :param command: the command string or iterable 

187 :param working_dir: the working directory 

188 :param timeout: the timeout 

189 :param stdin: a string to be written to stdin, or `None` 

190 :param stdout: how to handle the standard output stream 

191 :param stderr: how to handle the standard error stream 

192 """ 

193 if isinstance(command, str): 

194 command = [command] 

195 elif not isinstance(command, Iterable): 

196 raise type_error(command, "command", (str, Iterable)) 

197 object.__setattr__(self, "command", tuple( 

198 s for s in map(str.strip, command) if str.__len__(s) > 0)) 

199 if tuple.__len__(self.command) <= 0: 

200 raise ValueError(f"Invalid command {command!r}.") 

201 

202 object.__setattr__(self, "working_dir", directory_path( 

203 getcwd() if working_dir is None else working_dir)) 

204 

205 object.__setattr__(self, "timeout", check_int_range( 

206 timeout, "timeout", 1, 1_000_000)) 

207 

208 if (stdin is not None) and (not isinstance(stdin, str)): 

209 raise type_error(stdin, "stdin", (str, None)) 

210 object.__setattr__(self, "stdin", stdin) 

211 

212 object.__setattr__(self, "stdout", check_int_range( 

213 stdout, "stdout", 0, 2)) 

214 object.__setattr__(self, "stderr", check_int_range( 

215 stderr, "stderr", 0, 2)) 

216 

217 the_env: tuple[tuple[str, str], ...] | None = None 

218 if env is not None: 

219 if not isinstance(env, Mapping): 

220 raise type_error(env, "env", Mapping) 

221 if len(env) > 0: 

222 the_env = tuple(sorted((str.strip(k), str.strip(v)) 

223 for k, v in env.items())) 

224 object.__setattr__(self, "env", the_env) 

225 

226 def __str__(self) -> str: 

227 """ 

228 Get the string representation of this command. 

229 

230 This string includes most of the important information, but it will 

231 never include the environment variables. These variables may contain 

232 security sensitive stuff, so they are not printed. Instead. if an 

233 environment is specified, this will just be printed as `<env>`. 

234 

235 :returns: A string representing this command 

236 

237 >>> str(Command("a"))[-50:] 

238 ' with no stdin, stdout ignored, and stderr ignored' 

239 >>> str(Command("x"))[:11] 

240 "('x',) in '" 

241 >>> "with 3 chars of stdin" in str(Command("x", stdin="123")) 

242 True 

243 >>> str(Command("a", env={"y": "x"}))[-65:] 

244 'for 3600s with <env> no stdin, stdout ignored, and stderr ignored' 

245 """ 

246 si: str = "no" if self.stdin is None \ 

247 else f"{str.__len__(self.stdin)} chars of" 

248 ev: str = "" if self.env is None else "<env> " 

249 return (f"{self.command!r} in {self.working_dir!r} for {self.timeout}" 

250 f"s with {ev}{si} stdin, stdout{_SM(self.stdout)}, and " 

251 f"stderr{_SM(self.stderr)}") 

252 

253 def execute(self, log_call: bool = True) -> tuple[str | None, str | None]: 

254 r""" 

255 Execute the given process. 

256 

257 :param log_call: should the call be logged? If `True`, the 

258 string representation of the :class:`Command` will be 

259 written to the `logger`, otherwise nothing is logged. 

260 Note: The environment, if any, will not be printed for security 

261 reasons. 

262 :returns: a tuple with the standard output and standard error, which 

263 are only not `None` if they were supposed to be captured 

264 :raises TypeError: if any argument has the wrong type 

265 :raises ValueError: if execution of the process failed 

266 

267 >>> Command(("echo", "123"), stdout=STREAM_CAPTURE).execute(False) 

268 ('123\n', None) 

269 

270 >>> Command(("echo", "", "123"), stdout=STREAM_CAPTURE).execute(False) 

271 ('123\n', None) 

272 

273 >>> from contextlib import redirect_stdout 

274 >>> with redirect_stdout(None): 

275 ... s = Command(("echo", "123"), stdout=STREAM_CAPTURE).execute() 

276 >>> print(s) 

277 ('123\n', None) 

278 

279 >>> Command("cat", stdin="test", stdout=STREAM_CAPTURE).execute(False) 

280 ('test', None) 

281 

282 >>> Command("cat", stdin="test").execute(False) 

283 (None, None) 

284 

285 >>> try: 

286 ... with redirect_stdout(None): 

287 ... Command(("ping", "blabla!")).execute(True) 

288 ... except ValueError as ve: 

289 ... ss = str(ve) 

290 ... print(ss[:20] + " ... " + ss[-22:]) 

291 ('ping', 'blabla!') ... yields return code 2. 

292 

293 >>> try: 

294 ... with redirect_stdout(None): 

295 ... Command(("ping", "www.example.com", "-i 20"), 

296 ... timeout=1).execute(True) 

297 ... except ValueError as ve: 

298 ... print("timed out after" in str(ve)) 

299 True 

300 

301 >>> try: 

302 ... Command("x").execute(None) 

303 ... except TypeError as te: 

304 ... print(te) 

305 log_call should be an instance of bool but is None. 

306 

307 >>> try: 

308 ... Command("x").execute(1) 

309 ... except TypeError as te: 

310 ... print(te) 

311 log_call should be an instance of bool but is int, namely 1. 

312 

313 >>> with redirect_stdout(None): 

314 ... r = Command(("echo", "1"), stderr=STREAM_CAPTURE).execute( 

315 ... True) 

316 >>> r 

317 (None, '') 

318 

319 >>> with redirect_stdout(None): 

320 ... r = Command(("printenv", ), 

321 ... stdout=STREAM_CAPTURE, 

322 ... env={"BLA": "XX"}).execute(True) 

323 >>> r 

324 ('BLA=XX\n', None) 

325 """ 

326 if not isinstance(log_call, bool): 

327 raise type_error(log_call, "log_call", bool) 

328 message: Final[str] = str(self) 

329 if log_call: 

330 logger(f"Now invoking {message}.") 

331 

332 arguments: Final[dict[str, str | Iterable[str] | bool | int]] = { 

333 "args": self.command, 

334 "check": False, 

335 "text": True, 

336 "timeout": self.timeout, 

337 "cwd": self.working_dir, 

338 "errors": "strict", 

339 "encoding": UTF8, 

340 } 

341 

342 if self.stdin is not None: 

343 arguments["input"] = self.stdin 

344 

345 arguments["stdout"] = 1 if self.stdout == STREAM_FORWARD else ( 

346 subprocess.PIPE if self.stdout == STREAM_CAPTURE 

347 else subprocess.DEVNULL) 

348 arguments["stderr"] = 2 if self.stderr == STREAM_FORWARD else ( 

349 subprocess.PIPE if self.stderr == STREAM_CAPTURE 

350 else subprocess.DEVNULL) 

351 

352 if self.env is not None: 

353 arguments["env"] = {t[0]: t[1] for t in self.env} 

354 

355 try: 

356 # noqa # nosemgrep # pylint: disable=W1510 # type: ignore 

357 ret: Final[subprocess.CompletedProcess] = \ 

358 subprocess.run(**arguments) # type: ignore # nosec # noqa 

359 except (TimeoutError, subprocess.TimeoutExpired) as toe: 

360 if log_call: 

361 logger(f"Failed executing {self} with timeout {toe}.") 

362 raise ValueError(f"{message} timed out: {toe}.") from toe 

363 

364 returncode: Final[int] = ret.returncode 

365 if returncode != 0: 

366 if log_call: 

367 logger(f"Failed executing {self}: got return" 

368 f" code {returncode}.") 

369 raise ValueError(f"{message} yields return code {returncode}.") 

370 

371 stdout: str | None = None 

372 if self.stdout == STREAM_CAPTURE: 

373 stdout = ret.stdout 

374 if not isinstance(stdout, str): 

375 raise type_error(stdout, f"stdout of {self}", stdout) 

376 

377 stderr: str | None = None 

378 if self.stderr == STREAM_CAPTURE: 

379 stderr = ret.stderr 

380 if not isinstance(stderr, str): 

381 raise type_error(stderr, f"stderr of {self}", stderr) 

382 

383 if log_call: 

384 capture: str = "" 

385 if stdout is not None: 

386 capture = f", captured {str.__len__(stdout)} chars of stdout" 

387 if stderr is not None: 

388 capture = f"{capture} and " if str.__len__(capture) > 0 \ 

389 else ", captured " 

390 capture = f"{capture}{str.__len__(stderr)} chars of stderr" 

391 logger(f"Finished executing {self} with return code 0{capture}.") 

392 

393 return stdout, stderr