Coverage for bookbuilderpy/shell.py: 76%

46 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-17 23:15 +0000

1"""The tool for invoking shell commands.""" 

2 

3import subprocess # nosec 

4from typing import Callable, Iterable 

5 

6from bookbuilderpy.logger import logger 

7from bookbuilderpy.path import Path 

8 

9 

10def shell(command: str | Iterable[str], 

11 timeout: int = 3600, 

12 cwd: str | None = None, 

13 wants_stdout: bool = False, 

14 exit_code_to_str: dict[int, str] | None = None, 

15 check_stderr: Callable[[str], BaseException | None] 

16 = lambda x: None) -> str | None: 

17 """ 

18 Execute a text-based command on the shell. 

19 

20 The command is executed and its stdout and stderr and return code are 

21 captured. If the command had a non-zero exit code, an exception is 

22 thrown. The command itself, as well as the parameters are logged via 

23 the logger. If wants_stdout is True, the command's stdout is returned. 

24 Otherwise, None is returned. 

25 

26 :param command: the command to execute 

27 :param timeout: the timeout 

28 :param cwd: the directory to run inside 

29 :param wants_stdout: if `True`, the stdout is returned, if `False`, 

30 `None` is returned 

31 :param exit_code_to_str: an optional map 

32 converting erroneous exit codes to strings 

33 :param check_stderr: an optional callable that is applied to the std_err 

34 string and may raise an exception if need be 

35 """ 

36 if timeout < 0: 

37 raise ValueError(f"Timeout must be positive, but is {timeout}.") 

38 if not command: 

39 raise ValueError(f"Empty command '{command}'!") 

40 cmd = [s.strip() for s in list(command)] 

41 cmd = [s for s in cmd if s] 

42 if not cmd: 

43 raise ValueError(f"Empty stripped command '{cmd}'!") 

44 execstr = "' '".join(cmd) 

45 

46 if cwd: 

47 wd = Path.directory(cwd) 

48 execstr = f"'{execstr}' in '{wd}'" 

49 logger(f"executing {execstr}.") 

50 # nosemgrep 

51 ret = subprocess.run(cmd, check=False, text=True, # nosec # noqa 

52 timeout=timeout, # nosec # noqa 

53 capture_output=True, # nosec # noqa 

54 cwd=wd) # nosec # noqa 

55 else: 

56 execstr = f"'{execstr}'" 

57 logger(f"executing {execstr}.") 

58 # nosemgrep 

59 ret = subprocess.run(cmd, check=False, text=True, # nosec # noqa 

60 timeout=timeout, # nosec # noqa 

61 capture_output=True) # nosec # noqa 

62 

63 logging = [f"finished executing {execstr}.", 

64 f"obtained return value {ret.returncode}."] 

65 

66 if (ret.returncode != 0) and exit_code_to_str: 

67 ec: str | None = exit_code_to_str.get(ret.returncode, None) 

68 if ec: 

69 logging.append(f"meaning of return value: {ec}") 

70 

71 stdout = ret.stdout 

72 if stdout: 

73 stdout = stdout.strip() 

74 if stdout: 

75 logging.append(f"\nstdout:\n{stdout}") 

76 else: 

77 stdout = "" 

78 

79 stderr = ret.stderr 

80 if stderr: 

81 stderr = stderr.strip() 

82 if stderr: 

83 logging.append(f"\nstderr:\n{stderr}") 

84 logger("\n".join(logging)) 

85 

86 if ret.returncode != 0: 

87 raise ValueError( 

88 f"Error {ret.returncode} when executing {execstr} compressor.") 

89 

90 if stderr: 

91 exception = check_stderr(stderr) 

92 if exception is not None: 

93 raise exception 

94 

95 return stdout if wants_stdout else None