Coverage for moptipy / api / _mo_process_no_ss_log.py: 90%

63 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-29 10:36 +0000

1"""A multi-objective process with logging.""" 

2from typing import Callable, Final, cast # pylint: disable=W0611 

3 

4import numpy as np 

5from numpy import copyto 

6from pycommons.io.path import Path 

7from pycommons.types import type_error 

8 

9from moptipy.api._mo_process_no_ss import _MOProcessNoSS 

10from moptipy.api._process_base import _TIME_IN_NS, _check_log_time 

11from moptipy.api.algorithm import Algorithm 

12from moptipy.api.improvement_logger import ImprovementLogger 

13from moptipy.api.mo_archive import MOArchivePruner 

14from moptipy.api.mo_problem import MOProblem 

15from moptipy.api.space import Space 

16from moptipy.utils.logger import Logger 

17 

18 

19class _MOProcessNoSSLog(_MOProcessNoSS): 

20 """A multi-objective process with logging.""" 

21 

22 def __init__(self, 

23 solution_space: Space, 

24 objective: MOProblem, 

25 algorithm: Algorithm, 

26 pruner: MOArchivePruner, 

27 archive_max_size: int, 

28 archive_prune_limit: int, 

29 log_file: Path | None = None, 

30 rand_seed: int | None = None, 

31 max_fes: int | None = None, 

32 max_time_millis: int | None = None, 

33 goal_f: int | float | None = None, 

34 log_all_fes: bool = False, 

35 improvement_logger: ImprovementLogger | None = None) -> None: 

36 """ 

37 Perform the internal initialization. Do not call directly. 

38 

39 :param solution_space: the search- and solution space. 

40 :param objective: the objective function 

41 :param algorithm: the optimization algorithm 

42 :param pruner: the archive pruner 

43 :param archive_max_size: the maximum archive size after pruning 

44 :param archive_prune_limit: the archive size above which pruning will 

45 be performed 

46 :param log_file: the optional log file 

47 :param rand_seed: the optional random seed 

48 :param max_fes: the maximum permitted function evaluations 

49 :param max_time_millis: the maximum runtime in milliseconds 

50 :param goal_f: the goal objective value. if it is reached, the process 

51 is terminated 

52 :param log_all_fes: should we log all FEs? 

53 :param improvement_logger: an improvement logger, whose 

54 :meth:`~ImprovementLogger.log_improvement` method will be invoked 

55 whenever the process has registered an improvement 

56 """ 

57 super().__init__(solution_space=solution_space, 

58 objective=objective, 

59 algorithm=algorithm, 

60 pruner=pruner, 

61 archive_max_size=archive_max_size, 

62 archive_prune_limit=archive_prune_limit, 

63 log_file=log_file, 

64 rand_seed=rand_seed, 

65 max_fes=max_fes, 

66 max_time_millis=max_time_millis, 

67 goal_f=goal_f, 

68 improvement_logger=improvement_logger) 

69 if not isinstance(log_file, str): 

70 raise type_error(log_file, "log_file", str) 

71 if not isinstance(log_all_fes, bool): 

72 raise type_error(log_all_fes, "log_all_fes", bool) 

73 

74 #: `True` if all FEs are logged, `False` to only log improvements. 

75 self.__log_all: Final[bool] = log_all_fes 

76 #: The in-memory log 

77 self.__log: list[list[int | float | np.ndarray]] = [] 

78 #: the quick access to the log appending method 

79 self.__log_append = self.__log.append 

80 

81 def f_evaluate(self, x, fs: np.ndarray) -> float | int: 

82 if self._terminated: 

83 if self._knows_that_terminated: 

84 raise ValueError("The process has been terminated and " 

85 "the algorithm knows it.") 

86 return self._current_best_f 

87 

88 result: Final[int | float] = self._f_evaluate(x, fs) 

89 self._current_fes = current_fes = self._current_fes + 1 

90 do_term: bool = current_fes >= self._end_fes 

91 do_log: bool = self.__log_all 

92 ctn: int = 0 

93 

94 improved: bool = False 

95 if result < self._current_best_f: 

96 improved = True 

97 self._current_best_f = result 

98 copyto(self._current_best_fs, fs) 

99 self._copy_y(self._current_best_y, x) 

100 do_term = do_term or (result <= self._end_f) 

101 

102 if self.check_in(x, fs, True) or improved: 

103 self._last_improvement_fe = current_fes 

104 self._current_time_nanos = ctn = _TIME_IN_NS() 

105 self._last_improvement_time_nanos = ctn 

106 do_log = True 

107 if self._log_improvement: 

108 self._log_improvement( 

109 cast("Callable[[Logger], None]", 

110 lambda lg, _x=x, _f=result, _fs=fs: 

111 self._write_improvement(lg, None, _x, _f, _fs))) 

112 

113 if do_log: 

114 if ctn <= 0: 

115 self._current_time_nanos = ctn = _TIME_IN_NS() 

116 self.__log_append([current_fes, ctn, result, fs.copy()]) 

117 

118 if do_term: 

119 self.terminate() 

120 

121 return result 

122 

123 def _check_timing(self) -> None: 

124 super()._check_timing() 

125 _check_log_time(self._start_time_nanos, self._current_time_nanos, 

126 self.__log) 

127 

128 def _write_log(self, logger: Logger) -> None: 

129 self._write_mo_log(self.__log, self._start_time_nanos, 

130 self.__log_all, logger) 

131 del self.__log 

132 super()._write_log(logger) 

133 

134 def __str__(self) -> str: 

135 return "MOLoggingProcessWithoutSearchSpace"