Coverage for pycommons / processes / system_state.py: 81%

110 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-11 03:04 +0000

1""" 

2Functionality to log the current system state. 

3 

4Here we provide a small program that can be executed concurrently with other 

5activities and that logs information about the system state. This may be 

6useful when running some computationally heavy experiments to find potential 

7problems. 

8""" 

9 

10 

11import datetime 

12from argparse import ArgumentParser 

13from contextlib import AbstractContextManager, nullcontext, suppress 

14from time import sleep 

15from typing import Any, Callable, Final, Iterable 

16 

17from psutil import ( # type: ignore 

18 cpu_times, # type: ignore 

19 cpu_times_percent, # type: ignore 

20 disk_partitions, # type: ignore 

21 disk_usage, # type: ignore 

22 sensors_fans, # type: ignore 

23 sensors_temperatures, # type: ignore 

24 virtual_memory, # type: ignore 

25) 

26 

27from pycommons.io.arguments import pycommons_argparser 

28from pycommons.io.csv import CSV_SEPARATOR, SCOPE_SEPARATOR 

29from pycommons.math.int_math import try_int 

30from pycommons.strings.chars import WHITESPACE_OR_NEWLINE 

31from pycommons.strings.string_tools import replace_str 

32from pycommons.types import check_int_range, type_error 

33 

34#: the "now" function 

35__DTN: Final[Callable[[], datetime.datetime]] = datetime.datetime.now 

36 

37#: the characters to replace with the `SCOPE_SEPARATOR` 

38__REPL: Final[tuple[str, ...]] = tuple( 

39 f"/\\{{}}{WHITESPACE_OR_NEWLINE}{CSV_SEPARATOR}") 

40 

41#: the double scope 

42__DOUBLE_SCOPE: Final[str] = f"{SCOPE_SEPARATOR}{SCOPE_SEPARATOR}" 

43 

44 

45def __fix_key(key: Any) -> str | None: 

46 """ 

47 Fix a key for usage. 

48 

49 :param key: the key 

50 :returns: the key string 

51 

52 >>> print(__fix_key(None)) 

53 None 

54 >>> print(__fix_key(1)) 

55 None 

56 >>> print(__fix_key("")) 

57 None 

58 >>> print(__fix_key(" ")) 

59 None 

60 >>> print(__fix_key(".")) 

61 None 

62 >>> print(__fix_key(". .")) 

63 None 

64 >>> print(__fix_key("....")) 

65 None 

66 >>> __fix_key(".d x") 

67 'd.x' 

68 >>> __fix_key(".d ..x") 

69 'd.x' 

70 >>> __fix_key(".v yd ..x yxc .") 

71 'v.yd.x.yxc' 

72 """ 

73 if not isinstance(key, str): 

74 return None 

75 key = str.strip(key) 

76 if str.__len__(key) <= 0: 

77 return None 

78 for ch in __REPL: 

79 key = str.replace(key, ch, SCOPE_SEPARATOR) 

80 key = str.strip(replace_str(__DOUBLE_SCOPE, SCOPE_SEPARATOR, key)) 

81 while str.startswith(key, SCOPE_SEPARATOR): 

82 key = str.strip(key[1:]) 

83 while str.endswith(key, SCOPE_SEPARATOR): 

84 key = str.strip(key[:-1]) 

85 return None if str.__len__(key) <= 0 else key 

86 

87 

88def __collect_attrs(prefix: str, data: Any, fields: Iterable[str], 

89 collector: Callable[[str, str], Any]) -> None: 

90 """ 

91 Pass the attributes to a collector. 

92 

93 :param prefix: the attribute prefix 

94 :param data: the named tuple 

95 :param fields: the fields 

96 :param collector: the collector receiving the attributes 

97 

98 >>> def __ptr(a: str, b: str) -> None: 

99 ... print(f"{a}: {b}") 

100 

101 >>> __collect_attrs("", None, (), __ptr) 

102 

103 >>> __collect_attrs("", "a", ("__class__", ), __ptr) 

104 __class__: <class 'str'> 

105 

106 >>> __collect_attrs("prefix.", "a", ("__class__", ), __ptr) 

107 prefix.__class__: <class 'str'> 

108 

109 >>> __collect_attrs("prefix.", "a", ("__class__", ), __ptr) 

110 prefix.__class__: <class 'str'> 

111 

112 >>> __collect_attrs("prefix.", "a", ("__class__", "__class__"), __ptr) 

113 prefix.__class__: <class 'str'> 

114 prefix.__class__: <class 'str'> 

115 """ 

116 if data is None: 

117 return 

118 for attr in fields: 

119 if hasattr(data, attr): 

120 val: Any = getattr(data, attr) 

121 if val is not None: 

122 k: str | None = __fix_key(f"{prefix}{attr}") 

123 if k is not None: 

124 collector(k, repr(val)) 

125 

126 

127def __collect_struct(prefix: str, data: Any, fields: Iterable[str], 

128 collector: Callable[[str, str], Any]) -> None: 

129 """ 

130 Pass a structured info system to a collector. 

131 

132 :param prefix: the prefix to use 

133 :param data: the data record 

134 :param fields: the fields on the per-row basis 

135 :param collector: the collector to receive the strings 

136 

137 >>> def __ptr(a: str, b: str) -> None: 

138 ... print(f"{a}: {b}") 

139 

140 >>> __collect_struct("", None, (), __ptr) 

141 """ 

142 if isinstance(data, dict): 

143 prefix = str.strip(prefix) 

144 for key in data: 

145 if isinstance(key, str): 

146 row: Any = data.get(key, None) 

147 if isinstance(row, Iterable): 

148 for element in row: 

149 if element is not None: 

150 name: str = f"{str.strip(key)}." 

151 if hasattr(element, "label"): 

152 label: Any = getattr(element, "label") 

153 if isinstance(label, str): 

154 label = str.strip(label) 

155 if str.__len__(label) > 0: 

156 name = f"{prefix}{name}.{label}." 

157 __collect_attrs(name, element, fields, collector) 

158 

159 

160def collect_system_state( 

161 collector: Callable[[str, str], Any]) -> None: 

162 """ 

163 Get a single string with the current state of the system. 

164 

165 :param collector: the collector to receive the key-value tuples 

166 

167 >>> def __ptr(a: str, b: str) -> None: 

168 ... pass 

169 

170 >>> s = collect_system_state(__ptr) 

171 

172 >>> try: 

173 ... collect_system_state(None) 

174 ... except TypeError as te: 

175 ... print(te) 

176 collector should be a callable but is None. 

177 """ 

178 if not callable(collector): 

179 raise type_error(collector, "collector", call=True) 

180 

181 now: Final = __DTN() 

182 collector("now", repr(try_int(now.timestamp()))) 

183 __collect_attrs("now.", now, ( 

184 "year", "month", "day", "hour", "minute", "second", "microsecond"), 

185 collector) 

186 

187 with suppress(BaseException): 

188 __collect_attrs("cpu_times.", cpu_times(), ("user", "system", "idle"), 

189 collector) 

190 

191 with suppress(BaseException): 

192 cpup: Any = cpu_times_percent(percpu=True) 

193 if isinstance(cpup, Iterable): 

194 for i, z in enumerate(cpup): 

195 __collect_attrs(f"cpu_{i}_usage.", z, ( 

196 "user", "system", "idle"), collector) 

197 

198 with suppress(BaseException): 

199 __collect_attrs("memory.", virtual_memory(), ( 

200 "total", "available", "percent", "used", "free"), collector) 

201 

202 with suppress(BaseException): 

203 dps: Any = disk_partitions(False) 

204 if isinstance(dps, Iterable): 

205 for disk in dps: 

206 if not hasattr(disk, "mountpoint"): 

207 continue 

208 mp = getattr(disk, "mountpoint") 

209 if not isinstance(mp, str): 

210 continue 

211 if str.startswith(mp, ("/snap/", "/var/snap/")): 

212 continue 

213 with suppress(BaseException): 

214 __collect_attrs(f"disk.{mp}.", disk_usage(mp), ( 

215 "total", "used", "free", "percent"), collector) 

216 

217 with suppress(BaseException): 

218 __collect_struct("temperature.", sensors_temperatures(False), ( 

219 "current", "high", "critical"), collector) 

220 

221 with suppress(BaseException): 

222 __collect_struct("fan speed", sensors_fans(), ("current", ), 

223 collector) 

224 

225 

226def log_system_state(interval_seconds: int = 300, 

227 should_stop: Callable[[], bool] = lambda: False, 

228 lock: AbstractContextManager = nullcontext()) -> None: 

229 r""" 

230 Log the system state periodically to the stdout. 

231 

232 This function allows for periodic logging of the system state to the 

233 standard output. This can be launched as a program running besides an 

234 experiment in order to help tracking potential problems. Let's say that 

235 your experiment or whatever program crashes for unclear reasons. Why did 

236 it crash? We don't know. Maybe it crashed because it ran out of memory. 

237 Maybe it ran out of disk space? Maybe not? Who knows. If you let this 

238 function here run concurrently to your program and pipe its output to a 

239 log file, then at least you will be able to see if the system slowly runs 

240 out of memory, disk space, or if the CPU gets too hot, or something. Or, 

241 at least, you can rule out that this is not the case. 

242 

243 The output is presented in CSV format. Therefore, you can pipe it to a 

244 file and later open it in Excel or whatever. This allows you to draw 

245 diagrams of the usage of CPUs and memory or the temperature of the CPU 

246 over time. 

247 

248 :param interval_seconds: the interval seconds 

249 :param should_stop: a function telling the logger when it should stop 

250 :param lock: a shared lock for the console access 

251 

252 # Example: 

253 >>> from contextlib import redirect_stdout 

254 >>> from io import StringIO 

255 >>> sio = StringIO() 

256 

257 >>> def __three(lst=[1, 2, 3, 4, 5, 6]) -> bool: 

258 ... if list.__len__(lst) > 0: 

259 ... del lst[-1] 

260 ... return False 

261 ... return True 

262 

263 >>> with redirect_stdout(sio): 

264 ... log_system_state(1, __three) 

265 >>> v = sio.getvalue().splitlines() 

266 >>> len(v) 

267 4 

268 >>> v[0][:20] 

269 'now;now.year;now.mon' 

270 >>> i = list.__len__(v[0].split(CSV_SEPARATOR)) 

271 >>> all(list.__len__(vv.split(CSV_SEPARATOR)) == i for vv in v) 

272 True 

273 

274 >>> try: 

275 ... log_system_state(1, lock=None) 

276 ... except TypeError as te: 

277 ... print(str(te)[0:60]) 

278 lock should be an instance of contextlib.AbstractContextMana 

279 

280 >>> try: 

281 ... log_system_state(1, should_stop=None) 

282 ... except TypeError as te: 

283 ... print(te) 

284 should_stop should be a callable but is None. 

285 """ 

286 interval_seconds = check_int_range( 

287 interval_seconds, "interval_seconds", 1, 1_000_000_000) 

288 if not callable(should_stop): 

289 raise type_error(should_stop, "should_stop", call=True) 

290 if not isinstance(lock, AbstractContextManager): 

291 raise type_error(lock, "lock", AbstractContextManager) 

292 

293 keys: Final[list[str]] = [] 

294 collect_system_state(lambda a, _, x=keys.append: x(a)) # type: ignore 

295 print(CSV_SEPARATOR.join(keys)) # noqa: T201 

296 current: dict[str, str] = {} 

297 

298 while not should_stop(): 

299 collect_system_state(current.__setitem__) 

300 print(CSV_SEPARATOR.join( # noqa: T201 

301 current[k] for k in keys if k in current)) 

302 current.clear() 

303 if should_stop(): 

304 return 

305 sleep(interval_seconds) 

306 

307 

308# Run documentation generation process if executed as script 

309if __name__ == "__main__": 

310 parser: Final[ArgumentParser] = pycommons_argparser( 

311 __file__, 

312 "Print the System State", 

313 "A program printing the state of the system in fixed intervals.") 

314 parser.add_argument( 

315 "--interval", nargs="?", type=int, default=300, 

316 help="the interval between printing the state in seconds") 

317 args = parser.parse_args() 

318 log_system_state(args.interval)