Coverage for bookbuilderpy/compress.py: 21%

67 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-17 23:15 +0000

1"""Routines for compressing lists of files.""" 

2 

3import os.path 

4from typing import Iterable, cast 

5 

6from bookbuilderpy.build_result import File 

7from bookbuilderpy.logger import logger 

8from bookbuilderpy.path import Path 

9from bookbuilderpy.shell import shell 

10from bookbuilderpy.temp import TempFile 

11from bookbuilderpy.types import type_error 

12from bookbuilderpy.versions import TOOL_TAR, TOOL_XZ, TOOL_ZIP, has_tool 

13 

14 

15def __paths(source: Iterable[Path | File | str]) \ 

16 -> tuple[None | Path, list[str]]: 

17 """ 

18 Convert the iterable of input files into a common path list. 

19 

20 :param source: the paths 

21 :return: a tuple of a common base path (if any) and the paths 

22 """ 

23 files: list[Path] = [] 

24 for f in source: 

25 if isinstance(f, Path): 

26 f.enforce_file() 

27 files.append(f) 

28 elif isinstance(f, File): 

29 f.path.enforce_file() 

30 files.append(f.path) 

31 elif isinstance(f, str): 

32 files.append(Path.file(f)) 

33 else: 

34 raise type_error(f, "path element", (Path, File, str)) 

35 if len(files) <= 1: 

36 raise ValueError("Nothing to compress?") 

37 

38 base_dir = os.path.commonpath(files) 

39 if base_dir: 

40 return Path.directory(base_dir), \ 

41 [f.relative_to(base_dir) for f in files] 

42 return None, cast(list[str], files) 

43 

44 

45def can_xz_compress() -> bool: 

46 """ 

47 Check if xz compression is available. 

48 

49 :return: `True` if xz compression is available, `False` otherwise 

50 """ 

51 return has_tool(TOOL_TAR) and has_tool(TOOL_XZ) 

52 

53 

54def compress_xz(source: Iterable[Path | File | str], 

55 dest: str) -> File: 

56 """ 

57 Compress a sequence of files to tar.xz. 

58 

59 :param source: the list of files 

60 :param dest: the destination file 

61 :return: the created archive 

62 """ 

63 if not has_tool(TOOL_TAR): 

64 raise ValueError(f"tool {TOOL_TAR} not installed.") 

65 if not has_tool(TOOL_XZ): 

66 raise ValueError(f"tool {TOOL_XZ} not installed.") 

67 

68 base_dir, files = __paths(source) 

69 logger(f"Applying tar.xz compression to {files}.") 

70 

71 out = Path.path(dest) 

72 if os.path.exists(out): 

73 raise ValueError(f"File '{out}' already exists!") 

74 out_dir = Path.directory(os.path.dirname(out)) 

75 

76 paths = '" "'.join(files) 

77 if not base_dir: 

78 base_dir = out_dir 

79 

80 with TempFile.create() as tf: 

81 tf.write_all( 

82 f'#!/bin/bash\n{TOOL_TAR} -c "{paths}" | ' 

83 f'{TOOL_XZ} -v -9e -c > "{out}"\n') 

84 shell(["sh", tf], timeout=360, cwd=base_dir) 

85 

86 return File(out) 

87 

88 

89def can_zip_compress() -> bool: 

90 """ 

91 Check if zip compression is available. 

92 

93 :return: `True` if zip compression is available, `False` otherwise 

94 """ 

95 return has_tool(TOOL_ZIP) 

96 

97 

98def compress_zip(source: Iterable[Path | File | str], 

99 dest: str) -> File: 

100 """ 

101 Compress a sequence of files to zip. 

102 

103 :param source: the list of files 

104 :param dest: the destination file 

105 :return: the created archive 

106 """ 

107 if not has_tool(TOOL_ZIP): 

108 raise ValueError(f"Tool {TOOL_ZIP} not installed.") 

109 

110 base_dir, files = __paths(source) 

111 logger(f"Applying zip compression to {files}.") 

112 

113 out = Path.path(dest) 

114 if os.path.exists(out): 

115 raise ValueError(f"File '{out}' already exists!") 

116 out_dir = Path.directory(os.path.dirname(out)) 

117 

118 if not base_dir: 

119 base_dir = out_dir 

120 

121 files.insert(0, out) 

122 files.insert(0, "-9") 

123 files.insert(0, "-X") 

124 files.insert(0, "-UN=UTF8") 

125 files.insert(0, TOOL_ZIP) 

126 

127 shell(files, timeout=360, cwd=base_dir) # nosec 

128 return File(out)