Coverage for moptipy / api / _process_base.py: 85%
314 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-24 08:49 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-24 08:49 +0000
1"""An internal module with the base class for implementing Processes."""
2from io import StringIO
3from math import inf, isfinite
4from threading import Lock, Timer
5from time import time_ns
6from traceback import print_tb
7from typing import Any, Callable, Final, cast
9from numpy.random import Generator
10from pycommons.io.path import Path
11from pycommons.types import type_error, type_name_of
13from moptipy.api.algorithm import Algorithm, check_algorithm
14from moptipy.api.logging import (
15 _ALL_SECTIONS,
16 ERROR_SECTION_PREFIX,
17 KEY_BEST_F,
18 KEY_EXCEPTION_STACK_TRACE,
19 KEY_EXCEPTION_TYPE,
20 KEY_EXCEPTION_VALUE,
21 KEY_GOAL_F,
22 KEY_LAST_IMPROVEMENT_FE,
23 KEY_LAST_IMPROVEMENT_TIME_MILLIS,
24 KEY_MAX_FES,
25 KEY_MAX_TIME_MILLIS,
26 KEY_RAND_BIT_GENERATOR_TYPE,
27 KEY_RAND_GENERATOR_TYPE,
28 KEY_RAND_SEED,
29 KEY_TOTAL_FES,
30 KEY_TOTAL_TIME_MILLIS,
31 SCOPE_ALGORITHM,
32 SCOPE_OBJECTIVE_FUNCTION,
33 SCOPE_PROCESS,
34 SCOPE_SOLUTION_SPACE,
35 SECTION_ERROR_BEST_F,
36 SECTION_ERROR_IN_CONTEXT,
37 SECTION_ERROR_IN_LOG,
38 SECTION_ERROR_IN_RUN,
39 SECTION_ERROR_INVALID_X,
40 SECTION_ERROR_INVALID_Y,
41 SECTION_ERROR_TIMING,
42 SECTION_FINAL_STATE,
43 SECTION_RESULT_Y,
44 SECTION_SETUP,
45)
46from moptipy.api.objective import Objective, check_objective
47from moptipy.api.process import (
48 Process,
49 check_goal_f,
50 check_max_fes,
51 check_max_time_millis,
52)
53from moptipy.api.space import Space, check_space
54from moptipy.utils.logger import (
55 SECTION_END,
56 SECTION_START,
57 FileLogger,
58 KeyValueLogSection,
59 Logger,
60)
61from moptipy.utils.nputils import (
62 rand_generator,
63 rand_seed_check,
64 rand_seed_generate,
65)
66from moptipy.utils.sys_info import log_sys_info
69def _error_1(logger: Logger, title: str, exception_type,
70 exception_value, traceback,
71 error_repl: str = f"{ERROR_SECTION_PREFIX!r}") -> None:
72 """
73 Create a text section with error information as from a contextmanager.
75 :param logger: the logger to write to
76 :param title: the title of the section with error information to be
77 created
78 :param exception_type: the exception type
79 :param exception_value: the exception value
80 :param traceback: the traceback
81 :param error_repl: a replacement for the error section prefix
82 """
83 if exception_type or exception_value or traceback:
84 with logger.text(title=title) as ts:
85 wt: Final[Callable[[str], None]] = ts.write
86 if exception_type:
87 if isinstance(exception_type, str):
88 if exception_type.startswith("<class '"):
89 exception_type = exception_type[8:-2]
90 else:
91 exception_type = type_name_of(exception_type)
92 wt(f"{KEY_EXCEPTION_TYPE}: {str.strip(exception_type)}")
93 if exception_value:
94 exception_value = str.strip(str(exception_value))
95 wt(f"{KEY_EXCEPTION_VALUE}: {exception_value}")
96 if traceback:
97 got: Final[list[str]] = []
98 sio: Final[StringIO] = StringIO()
99 print_tb(traceback, file=sio)
100 for line in str.splitlines(sio.getvalue()):
101 ll: str = str.strip(line)
102 if str.__len__(ll) <= 0:
103 continue
104 got.append(str.replace(
105 ll, ERROR_SECTION_PREFIX, error_repl))
106 if list.__len__(got) > 0:
107 wt(f"{KEY_EXCEPTION_STACK_TRACE}:")
108 for ll in got:
109 wt(ll)
112def _error_2(logger: Logger, title: str, exception: Exception) -> None:
113 """
114 Log an exception.
116 :param logger: the logger to write to
117 :param title: the title of the section with error information to be
118 created
119 :param exception: the exception
121 >>> from moptipy.utils.logger import Logger
122 >>> def __do_print(s: str) -> None:
123 ... s = str.strip(s)
124 ... if "~~^~~" not in s:
125 ... print(s)
126 >>> ime = Logger("pl", __do_print)
127 >>> def k():
128 ... 1 / 0
129 >>> try:
130 ... k()
131 ... except Exception as be:
132 ... _error_2(ime, "ERROR", be)
133 BEGIN_ERROR
134 exceptionType: ZeroDivisionError
135 exceptionValue: division by zero
136 exceptionStackTrace:
137 File "<doctest moptipy.api._process_base._error_2[4]>", line 2, in \
138<module>
139 k()
140 File "<doctest moptipy.api._process_base._error_2[3]>", line 2, in k
141 1 / 0
142 END_ERROR
143 """
144 _error_1(logger, title, exception_type=exception,
145 exception_value=str(exception),
146 traceback=exception.__traceback__)
149#: the function used to get the time
150_TIME_IN_NS: Final[Callable[[], int]] = time_ns
153def _ns_to_ms(nanos: int) -> int:
154 """
155 Convert nanoseconds to milliseconds by rounding up.
157 :param int nanos: the nanoseconds
158 :returns: the corresponding milliseconds, rounded up
160 >>> _ns_to_ms(0)
161 0
162 >>> _ns_to_ms(1)
163 1
164 >>> _ns_to_ms(999_999)
165 1
166 >>> _ns_to_ms(1_000_000)
167 1
168 >>> _ns_to_ms(1_000_001)
169 2
170 >>> _ns_to_ms(1_999_999)
171 2
172 >>> _ns_to_ms(2_000_000)
173 2
174 >>> _ns_to_ms(2_000_001)
175 3
176 """
177 return (nanos + 999_999) // 1_000_000
180class _ProcessBase(Process):
181 """The internal base class for implementing optimization processes."""
183 def __init__(self,
184 solution_space: Space,
185 objective: Objective,
186 algorithm: Algorithm,
187 log_file: Path | None = None,
188 rand_seed: int | None = None,
189 max_fes: int | None = None,
190 max_time_millis: int | None = None,
191 goal_f: int | float | None = None) -> None:
192 """
193 Perform the internal initialization. Do not call directly.
195 :param solution_space: the search- and solution space.
196 :param objective: the objective function
197 :param algorithm: the optimization algorithm
198 :param log_file: the optional log file
199 :param rand_seed: the optional random seed
200 :param max_fes: the maximum permitted function evaluations
201 :param max_time_millis: the maximum runtime in milliseconds
202 :param goal_f: the goal objective value. if it is reached, the process
203 is terminated
204 """
205 super().__init__()
206 #: This will be `True` after :meth:`terminate` has been called.
207 self._terminated: bool = False
208 #: This becomes `True` when :meth:`should_terminate` returned `True`.
209 self._knows_that_terminated: bool = False
210 #: The internal lock, needed to protect :meth:`terminate`.
211 self.__lock: Final[Lock] = Lock()
212 #: The maximum FEs.
213 self._max_fes: Final[int | None] = check_max_fes(max_fes, True)
214 #: A version of :attr:`_max_fes` that can be used in comparisons.
215 self._end_fes: Final[int] = 9_223_372_036_854_775_800 \
216 if (self._max_fes is None) else self._max_fes
217 #: The goal objective value.
218 self._goal_f: Final[int | float | None] = \
219 check_goal_f(goal_f, True)
220 #: A comparable version of :attr:`self._goal_f`.
221 self._end_f: int | float = \
222 -inf if (self._goal_f is None) else self._goal_f
223 #: The currently consumed nanoseconds.
224 self._current_time_nanos: int = 0
225 #: The currently consumed objective function evaluations (FEs).
226 self._current_fes: int = 0
227 #: The time (in nanoseconds) when the last improvement was made.
228 self._last_improvement_time_nanos: int = -1
229 #: The FE when the last improvement was made.
230 self._last_improvement_fe: int = -1
232 #: The solution space, i.e., the data structure of possible solutions.
233 self._solution_space: Final[Space] = check_space(solution_space)
234 #: The objective function rating candidate solutions.
235 self.__objective: Final[Objective] = check_objective(objective)
236 #: the internal invoker for the objective function
237 self._f: Final[Callable[[Any], int | float]] = \
238 self.__objective.evaluate
239 #: The algorithm to be applied.
240 self.__algorithm: Final[Algorithm] = check_algorithm(algorithm)
241 #: The random seed.
242 self.__rand_seed: Final[int] = rand_seed_generate() \
243 if rand_seed is None \
244 else rand_seed_check(rand_seed)
245 #: The random number generator.
246 self.__random: Final[Generator] = rand_generator(self.__rand_seed)
247 #: The current best solution.
248 self._current_best_y = solution_space.create()
249 #: The current best objective value
250 self._current_best_f: int | float = inf
251 #: The log file, or `None` is needed
252 if (log_file is not None) and (not isinstance(log_file, Path)):
253 raise type_error(log_file, "log_file", Path)
254 self.__log_file: Final[Path | None] = log_file
255 #: the method for copying y
256 self._copy_y: Final[Callable] = solution_space.copy
257 #: set up the method forwards
258 self.lower_bound = self.__objective.lower_bound # type: ignore
259 if self._end_f <= -inf:
260 self._end_f = check_goal_f(self.lower_bound())
261 self.lower_bound = lambda: self._end_f # type: ignore
262 self.upper_bound = objective.upper_bound # type: ignore
263 self.is_always_integer = objective.is_always_integer # type: ignore
264 self.create = solution_space.create # type: ignore
265 self.copy = solution_space.copy # type: ignore
266 self.to_str = solution_space.to_str # type: ignore
267 self.is_equal = solution_space.is_equal # type: ignore
268 self.from_str = solution_space.from_str # type: ignore
269 self.n_points = solution_space.n_points # type: ignore
270 self.validate = solution_space.validate # type: ignore
271 #: the internal section logger
272 self.__sections: dict[str, str] | None = \
273 None if log_file is None else {}
275 #: The time when the process was started, in nanoseconds.
276 self._start_time_nanos: Final[int] = _TIME_IN_NS()
277 #: The maximum runtime in milliseconds.
278 self._max_time_millis: Final[int | None] = \
279 check_max_time_millis(max_time_millis, True)
280 #: A comparable version of :attr:`_max_time_millis`, but representing
281 #: the end time in nanoseconds rounded to the next highest
282 #: millisecond.
283 self._end_time_nanos: Final[float | int] = \
284 inf if (self._max_time_millis is None) else \
285 _ns_to_ms(int(self._start_time_nanos
286 + (1_000_000 * self._max_time_millis))) \
287 * 1_000_000
288 #: The timer until the end-of-run, or `None` if there is no end time.
289 self.__timer: Final[Timer | None] = None \
290 if (self._max_time_millis is None) else \
291 Timer(interval=self._max_time_millis / 1_000.0,
292 function=self.terminate)
294 #: an internal base exception caught by the algorithm execution
295 self._caught: Exception | None = None
297 def _after_init(self) -> None:
298 """
299 Finish initialization, start timer for termination if needed.
301 Internal method that must be called after __init__ is completed.
302 """
303 if self.__timer is not None:
304 self.__timer.start()
306 def get_log_basename(self) -> str | None:
307 lf: Final[str | None] = self.__log_file
308 if lf is None:
309 return None
310 lid = lf.rfind(".")
311 lis = lf.rfind("/")
312 return lf[:lid] if (lid > 0) and (lid > lis) else lf
314 def get_random(self) -> Generator:
315 return self.__random
317 def should_terminate(self) -> bool:
318 if self._terminated:
319 self._knows_that_terminated = True
320 return True
321 return False
323 def get_consumed_fes(self) -> int:
324 return self._current_fes
326 def get_consumed_time_millis(self) -> int:
327 if not self._terminated:
328 self._current_time_nanos = time = _TIME_IN_NS()
329 if time >= self._end_time_nanos:
330 self.terminate()
331 return _ns_to_ms(self._current_time_nanos - self._start_time_nanos)
333 def get_max_time_millis(self) -> int | None:
334 return self._max_time_millis
336 def get_max_fes(self) -> int | None:
337 return self._max_fes
339 def get_last_improvement_fe(self) -> int:
340 if self._last_improvement_fe < 0:
341 raise ValueError("Did not perform FE yet, cannot query "
342 "last improvement FE.")
343 return self._last_improvement_fe
345 def get_last_improvement_time_millis(self) -> int:
346 if self._last_improvement_time_nanos < 0:
347 raise ValueError("Did not perform FE yet, cannot query "
348 "last improvement time.")
349 return _ns_to_ms(self._last_improvement_time_nanos
350 - self._start_time_nanos)
352 def has_best(self) -> bool:
353 return self._current_fes > 0
355 def get_best_f(self) -> int | float:
356 if self._current_fes > 0:
357 return self._current_best_f
358 raise ValueError("No current best available.")
360 def get_copy_of_best_x(self, x) -> None:
361 if self._current_fes > 0:
362 return self._copy_y(x, self._current_best_y)
363 raise ValueError("No current best available.")
365 def terminate(self) -> None:
366 with self.__lock:
367 old_terminated = self._terminated
368 self._terminated = True
369 if old_terminated:
370 return
371 if self.__timer is not None:
372 self.__timer.cancel()
373 del self.__timer
374 self._current_time_nanos = _TIME_IN_NS()
376 def get_copy_of_best_y(self, y) -> None:
377 """
378 Get a copy of the current best point in the solution space.
380 This method in this internal class just forwards to
381 :meth:`get_copy_of_best_x`.
383 :param y: the destination data structure to be overwritten
384 """
385 return self.get_copy_of_best_x(y)
387 def _log_own_parameters(self, logger: KeyValueLogSection) -> None:
388 """
389 Write the parameters of this process to the logger.
391 This includes the limits on runtime and FEs.
393 :param logger: the logger
394 """
395 super().log_parameters_to(logger)
396 if self._max_fes is not None:
397 logger.key_value(KEY_MAX_FES, self._max_fes)
398 if self._max_time_millis is not None:
399 logger.key_value(KEY_MAX_TIME_MILLIS, self._max_time_millis)
400 if self._goal_f is not None:
401 logger.key_value(KEY_GOAL_F, self._goal_f)
402 logger.key_value(KEY_RAND_SEED, self.__rand_seed, True)
403 logger.key_value(KEY_RAND_GENERATOR_TYPE, type_name_of(self.__random))
404 logger.key_value(KEY_RAND_BIT_GENERATOR_TYPE,
405 type_name_of(self.__random.bit_generator))
407 def log_parameters_to(self, logger: KeyValueLogSection) -> None:
408 """
409 Write the standard parameters of this process to the logger.
411 This includes the limits on runtime and FEs.
413 :param logger: the logger
414 """
415 with logger.scope(SCOPE_PROCESS) as sc:
416 self._log_own_parameters(sc)
417 with logger.scope(SCOPE_ALGORITHM) as sc:
418 self.__algorithm.log_parameters_to(sc)
419 with logger.scope(SCOPE_SOLUTION_SPACE) as sc:
420 self._solution_space.log_parameters_to(sc)
421 with logger.scope(SCOPE_OBJECTIVE_FUNCTION) as sc:
422 self.__objective.log_parameters_to(sc)
424 def add_log_section(self, title: str, text: str) -> None:
425 """
426 Add a section to the log, if a log is written (otherwise ignore it).
428 :param title: the title of the log section
429 :param text: the text to log
430 """
431 if not isinstance(title, str):
432 raise type_error(title, "title", str)
433 t = title.strip()
434 if (len(t) != len(title)) or (len(t) <= 0) or (" " in t) \
435 or ("\n" in t) or ("\t" in t):
436 raise ValueError("section title must not be empty or contain "
437 f"white space, but {title!r} is/does.")
438 if (t in _ALL_SECTIONS) or (SECTION_START in t) or (SECTION_END in t):
439 raise ValueError(f"title {t!r} is a reserved section title")
440 if t.upper() != t:
441 raise ValueError("section titles must be in upper case,"
442 f"but yours is {t!r} (vs. {t.upper()!r}.")
443 for ch in t: # check all character codes in t
444 code: int = ord(ch) # we will only permit A-Z, 0-9, and _
445 if not ((65 <= code <= 90) or (48 <= code <= 57) or (code == 95)):
446 raise ValueError(
447 f"{ch!r} forbidden in section title, but got {t!r}.")
448 if not isinstance(text, str):
449 raise type_error(text, "text", str)
450 if (SECTION_START in text) or (SECTION_END in text):
451 raise ValueError(
452 f"text of section {t!r} must not contain {SECTION_START!r} or"
453 f" {SECTION_END!r} but is {text!r}")
454 if self.__sections is not None:
455 if title in self.__sections:
456 raise ValueError(f"section {title!r} already logged.")
457 self.__sections[title] = text.strip()
459 def _log_best(self, kv: KeyValueLogSection) -> None:
460 """
461 Log the best solution.
463 :param kv: the key values logger
464 """
465 kv.key_value(KEY_BEST_F, self._current_best_f)
466 kv.key_value(KEY_LAST_IMPROVEMENT_FE,
467 self._last_improvement_fe)
468 kv.key_value(KEY_LAST_IMPROVEMENT_TIME_MILLIS,
469 _ns_to_ms(self._last_improvement_time_nanos
470 - self._start_time_nanos))
472 def _write_result(self, logger: Logger) -> None:
473 """
474 Write the end result into the log.
476 :param logger: the logger
477 """
478 with logger.text(SECTION_RESULT_Y) as txt:
479 txt.write(self._solution_space.to_str(self._current_best_y))
481 def _write_log(self, logger: Logger) -> None:
482 """Write the information gathered during optimization into the log."""
483 with logger.key_values(SECTION_FINAL_STATE) as kv:
484 kv.key_value(KEY_TOTAL_FES, self._current_fes)
485 kv.key_value(KEY_TOTAL_TIME_MILLIS,
486 _ns_to_ms(self._current_time_nanos
487 - self._start_time_nanos))
488 if self._current_fes > 0:
489 self._log_best(kv)
491 with logger.key_values(SECTION_SETUP) as kv:
492 self.log_parameters_to(kv)
494 log_sys_info(logger)
496 if self._current_fes > 0:
497 self._write_result(logger)
499 def _validate_x(self) -> None:
500 """Validate x, if it exists."""
502 def _check_timing(self) -> None:
503 """
504 Check whether there has been any timing errors.
506 :raises ValueError: if there is any timing error
507 """
508 if self._current_time_nanos < self._start_time_nanos:
509 raise ValueError(
510 f"current_time_nanos={self._current_time_nanos} < "
511 f"start_time_nanos={self._start_time_nanos}")
512 if self._current_fes <= 0:
513 raise ValueError("no FE was performed")
514 if self._current_fes < self._last_improvement_fe:
515 raise ValueError(
516 f"current_fe={self._current_fes} < "
517 f"last_improvement_fe={self._last_improvement_fe}")
518 if self._current_time_nanos < self._last_improvement_time_nanos:
519 raise ValueError(
520 f"current_time_nanos={self._current_time_nanos} < "
521 "last_improvement_time_nanos="
522 f"{self._last_improvement_time_nanos}")
524 def _validate_best_f(self) -> None:
525 """
526 Validate the best encountered objective value.
528 :raises ValueError: if there is an error
529 """
530 ff: Final[int | float] = self._f(self._current_best_y)
531 if ff != self._current_best_f:
532 raise ValueError( # noqa
533 "We re-computed the objective value of the best solution"
534 f" and got {ff}, but it has been registered as "
535 f"{self._current_best_f}!") # noqa
536 if not isfinite(ff):
537 raise ValueError( # noqa
538 f"The objective value {ff} of " # noqa
539 "the best solution is not finite?")
540 lb: Final[int | float] = self.__objective.lower_bound()
541 ub: Final[int | float] = self.__objective.upper_bound()
542 if not (lb <= ff <= ub):
543 raise ValueError( # noqa
544 f"The objective value {ff} of " # noqa
545 "the best solution is not within the lower and "
546 f"upper bound, i.e., [{lb}, {ub}]?") # noqa
548 def has_log(self) -> bool:
549 """
550 Check if this log has an associated log file.
552 :retval `True`: if the process is associated with a log output
553 :retval `False`: if no information is stored in a log output
554 """
555 return self.__log_file is not None
557 def __exit__(self, exception_type, exception_value, traceback) -> None:
558 """Exit the process and write the log if necessary."""
559 # noinspection PyProtectedMember
560 super().__exit__(exception_type, exception_value, traceback)
562 # Update the total consumed time, but not include the error checks
563 # below.
564 self._current_time_nanos = _TIME_IN_NS()
566 y_error: Exception | None = None # error in solution?
567 v_error: Exception | None = None # error in objective value?
568 x_error: Exception | None = None # error in search space?
569 t_error: Exception | None = None # error in timing?
570 log_error: Exception | None = None # error while logging?
571 try:
572 self._solution_space.validate(self._current_best_y)
573 except Exception as be: # noqa: BLE001
574 y_error = be
575 if self._current_fes > 0:
576 try:
577 self._validate_best_f()
578 except Exception as be: # noqa: BLE001
579 v_error = be
580 try:
581 self._validate_x()
582 except Exception as be: # noqa: BLE001
583 x_error = be
584 try:
585 self._check_timing()
586 except Exception as be: # noqa: BLE001
587 t_error = be
589 if self.__log_file is not None:
590 with FileLogger(self.__log_file) as logger:
591 try:
592 self._write_log(logger)
593 except Exception as be: # noqa: BLE001
594 log_error = be
596 if self._caught is not None:
597 _error_2(logger, SECTION_ERROR_IN_RUN,
598 self._caught)
599 if exception_type or exception_value or traceback:
600 _error_1(logger, SECTION_ERROR_IN_CONTEXT,
601 exception_type, exception_value, traceback)
602 if y_error:
603 _error_2(logger, SECTION_ERROR_INVALID_Y, y_error)
604 if v_error:
605 _error_2(logger, SECTION_ERROR_BEST_F, v_error)
606 if x_error:
607 _error_2(logger, SECTION_ERROR_INVALID_X, x_error)
608 if t_error:
609 _error_2(logger, SECTION_ERROR_TIMING, t_error)
610 if log_error:
611 _error_2(logger, SECTION_ERROR_IN_LOG, log_error)
613 # flush all the additional log sections at the end
614 for t in sorted(self.__sections.keys()):
615 with logger.text(t) as sec:
616 sec.write(self.__sections[t])
617 del self.__sections
619 if not exception_type:
620 # if no error happened when closing the process, raise any error
621 # caught during validation.
622 if self._caught is not None:
623 raise self._caught # pylint: disable=[E0702]
624 if y_error:
625 raise y_error
626 if v_error:
627 raise v_error
628 if x_error:
629 raise x_error
630 if t_error:
631 raise t_error
632 if log_error:
633 raise log_error
635 def __str__(self) -> str:
636 """
637 Get the name of this process implementation.
639 :return: "baseProcess"
640 """
641 return "baseProcess"
644def _check_log_time(start_time: int, current_time: int,
645 log: list[list]) -> None:
646 """
647 Check the time inside the log.
649 :param start_time: the start time
650 :param current_time: the current time
651 :param log: the log
652 :raises ValueError: if there is a timing error in the log
653 """
654 last_time: int = -1
655 last_fe: int = -1
656 for row in log:
657 fes: int = cast("int", row[0])
658 time: int = cast("int", row[1])
659 if fes < last_fe:
660 raise ValueError(f"fe={fes} after fe={last_fe}?")
661 if time < last_time:
662 raise ValueError(
663 f"time={time} of fe={fes} is less than "
664 f"last_time={last_time} of last_fe={last_fe}")
665 if time < start_time:
666 raise ValueError(
667 f"time={time} of fe={fes} is less than "
668 f"start_time_nanos={start_time}")
669 if time > current_time:
670 raise ValueError(
671 f"time={time} of fe={fes} is greater than "
672 f"current_time_nanos={current_time}")
673 last_time = time
674 last_fe = fes