Coverage for pycommons / processes / system_state.py: 81%
110 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 03:04 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 03:04 +0000
1"""
2Functionality to log the current system state.
4Here we provide a small program that can be executed concurrently with other
5activities and that logs information about the system state. This may be
6useful when running some computationally heavy experiments to find potential
7problems.
8"""
11import datetime
12from argparse import ArgumentParser
13from contextlib import AbstractContextManager, nullcontext, suppress
14from time import sleep
15from typing import Any, Callable, Final, Iterable
17from psutil import ( # type: ignore
18 cpu_times, # type: ignore
19 cpu_times_percent, # type: ignore
20 disk_partitions, # type: ignore
21 disk_usage, # type: ignore
22 sensors_fans, # type: ignore
23 sensors_temperatures, # type: ignore
24 virtual_memory, # type: ignore
25)
27from pycommons.io.arguments import pycommons_argparser
28from pycommons.io.csv import CSV_SEPARATOR, SCOPE_SEPARATOR
29from pycommons.math.int_math import try_int
30from pycommons.strings.chars import WHITESPACE_OR_NEWLINE
31from pycommons.strings.string_tools import replace_str
32from pycommons.types import check_int_range, type_error
34#: the "now" function
35__DTN: Final[Callable[[], datetime.datetime]] = datetime.datetime.now
37#: the characters to replace with the `SCOPE_SEPARATOR`
38__REPL: Final[tuple[str, ...]] = tuple(
39 f"/\\{{}}{WHITESPACE_OR_NEWLINE}{CSV_SEPARATOR}")
41#: the double scope
42__DOUBLE_SCOPE: Final[str] = f"{SCOPE_SEPARATOR}{SCOPE_SEPARATOR}"
45def __fix_key(key: Any) -> str | None:
46 """
47 Fix a key for usage.
49 :param key: the key
50 :returns: the key string
52 >>> print(__fix_key(None))
53 None
54 >>> print(__fix_key(1))
55 None
56 >>> print(__fix_key(""))
57 None
58 >>> print(__fix_key(" "))
59 None
60 >>> print(__fix_key("."))
61 None
62 >>> print(__fix_key(". ."))
63 None
64 >>> print(__fix_key("...."))
65 None
66 >>> __fix_key(".d x")
67 'd.x'
68 >>> __fix_key(".d ..x")
69 'd.x'
70 >>> __fix_key(".v yd ..x yxc .")
71 'v.yd.x.yxc'
72 """
73 if not isinstance(key, str):
74 return None
75 key = str.strip(key)
76 if str.__len__(key) <= 0:
77 return None
78 for ch in __REPL:
79 key = str.replace(key, ch, SCOPE_SEPARATOR)
80 key = str.strip(replace_str(__DOUBLE_SCOPE, SCOPE_SEPARATOR, key))
81 while str.startswith(key, SCOPE_SEPARATOR):
82 key = str.strip(key[1:])
83 while str.endswith(key, SCOPE_SEPARATOR):
84 key = str.strip(key[:-1])
85 return None if str.__len__(key) <= 0 else key
88def __collect_attrs(prefix: str, data: Any, fields: Iterable[str],
89 collector: Callable[[str, str], Any]) -> None:
90 """
91 Pass the attributes to a collector.
93 :param prefix: the attribute prefix
94 :param data: the named tuple
95 :param fields: the fields
96 :param collector: the collector receiving the attributes
98 >>> def __ptr(a: str, b: str) -> None:
99 ... print(f"{a}: {b}")
101 >>> __collect_attrs("", None, (), __ptr)
103 >>> __collect_attrs("", "a", ("__class__", ), __ptr)
104 __class__: <class 'str'>
106 >>> __collect_attrs("prefix.", "a", ("__class__", ), __ptr)
107 prefix.__class__: <class 'str'>
109 >>> __collect_attrs("prefix.", "a", ("__class__", ), __ptr)
110 prefix.__class__: <class 'str'>
112 >>> __collect_attrs("prefix.", "a", ("__class__", "__class__"), __ptr)
113 prefix.__class__: <class 'str'>
114 prefix.__class__: <class 'str'>
115 """
116 if data is None:
117 return
118 for attr in fields:
119 if hasattr(data, attr):
120 val: Any = getattr(data, attr)
121 if val is not None:
122 k: str | None = __fix_key(f"{prefix}{attr}")
123 if k is not None:
124 collector(k, repr(val))
127def __collect_struct(prefix: str, data: Any, fields: Iterable[str],
128 collector: Callable[[str, str], Any]) -> None:
129 """
130 Pass a structured info system to a collector.
132 :param prefix: the prefix to use
133 :param data: the data record
134 :param fields: the fields on the per-row basis
135 :param collector: the collector to receive the strings
137 >>> def __ptr(a: str, b: str) -> None:
138 ... print(f"{a}: {b}")
140 >>> __collect_struct("", None, (), __ptr)
141 """
142 if isinstance(data, dict):
143 prefix = str.strip(prefix)
144 for key in data:
145 if isinstance(key, str):
146 row: Any = data.get(key, None)
147 if isinstance(row, Iterable):
148 for element in row:
149 if element is not None:
150 name: str = f"{str.strip(key)}."
151 if hasattr(element, "label"):
152 label: Any = getattr(element, "label")
153 if isinstance(label, str):
154 label = str.strip(label)
155 if str.__len__(label) > 0:
156 name = f"{prefix}{name}.{label}."
157 __collect_attrs(name, element, fields, collector)
160def collect_system_state(
161 collector: Callable[[str, str], Any]) -> None:
162 """
163 Get a single string with the current state of the system.
165 :param collector: the collector to receive the key-value tuples
167 >>> def __ptr(a: str, b: str) -> None:
168 ... pass
170 >>> s = collect_system_state(__ptr)
172 >>> try:
173 ... collect_system_state(None)
174 ... except TypeError as te:
175 ... print(te)
176 collector should be a callable but is None.
177 """
178 if not callable(collector):
179 raise type_error(collector, "collector", call=True)
181 now: Final = __DTN()
182 collector("now", repr(try_int(now.timestamp())))
183 __collect_attrs("now.", now, (
184 "year", "month", "day", "hour", "minute", "second", "microsecond"),
185 collector)
187 with suppress(BaseException):
188 __collect_attrs("cpu_times.", cpu_times(), ("user", "system", "idle"),
189 collector)
191 with suppress(BaseException):
192 cpup: Any = cpu_times_percent(percpu=True)
193 if isinstance(cpup, Iterable):
194 for i, z in enumerate(cpup):
195 __collect_attrs(f"cpu_{i}_usage.", z, (
196 "user", "system", "idle"), collector)
198 with suppress(BaseException):
199 __collect_attrs("memory.", virtual_memory(), (
200 "total", "available", "percent", "used", "free"), collector)
202 with suppress(BaseException):
203 dps: Any = disk_partitions(False)
204 if isinstance(dps, Iterable):
205 for disk in dps:
206 if not hasattr(disk, "mountpoint"):
207 continue
208 mp = getattr(disk, "mountpoint")
209 if not isinstance(mp, str):
210 continue
211 if str.startswith(mp, ("/snap/", "/var/snap/")):
212 continue
213 with suppress(BaseException):
214 __collect_attrs(f"disk.{mp}.", disk_usage(mp), (
215 "total", "used", "free", "percent"), collector)
217 with suppress(BaseException):
218 __collect_struct("temperature.", sensors_temperatures(False), (
219 "current", "high", "critical"), collector)
221 with suppress(BaseException):
222 __collect_struct("fan speed", sensors_fans(), ("current", ),
223 collector)
226def log_system_state(interval_seconds: int = 300,
227 should_stop: Callable[[], bool] = lambda: False,
228 lock: AbstractContextManager = nullcontext()) -> None:
229 r"""
230 Log the system state periodically to the stdout.
232 This function allows for periodic logging of the system state to the
233 standard output. This can be launched as a program running besides an
234 experiment in order to help tracking potential problems. Let's say that
235 your experiment or whatever program crashes for unclear reasons. Why did
236 it crash? We don't know. Maybe it crashed because it ran out of memory.
237 Maybe it ran out of disk space? Maybe not? Who knows. If you let this
238 function here run concurrently to your program and pipe its output to a
239 log file, then at least you will be able to see if the system slowly runs
240 out of memory, disk space, or if the CPU gets too hot, or something. Or,
241 at least, you can rule out that this is not the case.
243 The output is presented in CSV format. Therefore, you can pipe it to a
244 file and later open it in Excel or whatever. This allows you to draw
245 diagrams of the usage of CPUs and memory or the temperature of the CPU
246 over time.
248 :param interval_seconds: the interval seconds
249 :param should_stop: a function telling the logger when it should stop
250 :param lock: a shared lock for the console access
252 # Example:
253 >>> from contextlib import redirect_stdout
254 >>> from io import StringIO
255 >>> sio = StringIO()
257 >>> def __three(lst=[1, 2, 3, 4, 5, 6]) -> bool:
258 ... if list.__len__(lst) > 0:
259 ... del lst[-1]
260 ... return False
261 ... return True
263 >>> with redirect_stdout(sio):
264 ... log_system_state(1, __three)
265 >>> v = sio.getvalue().splitlines()
266 >>> len(v)
267 4
268 >>> v[0][:20]
269 'now;now.year;now.mon'
270 >>> i = list.__len__(v[0].split(CSV_SEPARATOR))
271 >>> all(list.__len__(vv.split(CSV_SEPARATOR)) == i for vv in v)
272 True
274 >>> try:
275 ... log_system_state(1, lock=None)
276 ... except TypeError as te:
277 ... print(str(te)[0:60])
278 lock should be an instance of contextlib.AbstractContextMana
280 >>> try:
281 ... log_system_state(1, should_stop=None)
282 ... except TypeError as te:
283 ... print(te)
284 should_stop should be a callable but is None.
285 """
286 interval_seconds = check_int_range(
287 interval_seconds, "interval_seconds", 1, 1_000_000_000)
288 if not callable(should_stop):
289 raise type_error(should_stop, "should_stop", call=True)
290 if not isinstance(lock, AbstractContextManager):
291 raise type_error(lock, "lock", AbstractContextManager)
293 keys: Final[list[str]] = []
294 collect_system_state(lambda a, _, x=keys.append: x(a)) # type: ignore
295 print(CSV_SEPARATOR.join(keys)) # noqa: T201
296 current: dict[str, str] = {}
298 while not should_stop():
299 collect_system_state(current.__setitem__)
300 print(CSV_SEPARATOR.join( # noqa: T201
301 current[k] for k in keys if k in current))
302 current.clear()
303 if should_stop():
304 return
305 sleep(interval_seconds)
308# Run documentation generation process if executed as script
309if __name__ == "__main__":
310 parser: Final[ArgumentParser] = pycommons_argparser(
311 __file__,
312 "Print the System State",
313 "A program printing the state of the system in fixed intervals.")
314 parser.add_argument(
315 "--interval", nargs="?", type=int, default=300,
316 help="the interval between printing the state in seconds")
317 args = parser.parse_args()
318 log_system_state(args.interval)