Coverage for moptipy / api / _mo_process_no_ss.py: 93%
165 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-24 08:49 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-24 08:49 +0000
1"""Providing a multi-objective process without logging with a single space."""
3from math import isfinite
4from typing import Any, Callable, Final, cast
6import numpy as np
7from numpy import copyto
8from pycommons.io.path import Path
9from pycommons.types import check_int_range, type_error
11from moptipy.api._process_base import _TIME_IN_NS, _ns_to_ms, _ProcessBase
12from moptipy.api.algorithm import Algorithm
13from moptipy.api.logging import (
14 KEY_ARCHIVE_F,
15 KEY_ARCHIVE_MAX_SIZE,
16 KEY_ARCHIVE_PRUNE_LIMIT,
17 KEY_ARCHIVE_SIZE,
18 KEY_BEST_FS,
19 PREFIX_SECTION_ARCHIVE,
20 PROGRESS_CURRENT_F,
21 PROGRESS_FES,
22 PROGRESS_TIME_MILLIS,
23 SCOPE_PRUNER,
24 SECTION_ARCHIVE_QUALITY,
25 SECTION_PROGRESS,
26 SUFFIX_SECTION_ARCHIVE_Y,
27)
28from moptipy.api.mo_archive import (
29 MOArchivePruner,
30 MORecord,
31 check_mo_archive_pruner,
32)
33from moptipy.api.mo_problem import MOProblem
34from moptipy.api.mo_process import MOProcess
35from moptipy.api.space import Space
36from moptipy.utils.logger import KeyValueLogSection, Logger
37from moptipy.utils.nputils import array_to_str, np_to_py_number
40class _MOProcessNoSS(MOProcess, _ProcessBase):
41 """
42 An internal class process implementation.
44 This class implements a stand-alone process without explicit logging where
45 the search and solution space are the same.
46 """
48 def __init__(self,
49 solution_space: Space,
50 objective: MOProblem,
51 algorithm: Algorithm,
52 pruner: MOArchivePruner,
53 archive_max_size: int,
54 archive_prune_limit: int,
55 log_file: Path | None = None,
56 rand_seed: int | None = None,
57 max_fes: int | None = None,
58 max_time_millis: int | None = None,
59 goal_f: int | float | None = None) -> None:
60 """
61 Perform the internal initialization. Do not call directly.
63 :param solution_space: the search- and solution space.
64 :param objective: the objective function
65 :param algorithm: the optimization algorithm
66 :param pruner: the archive pruner
67 :param archive_max_size: the maximum archive size after pruning
68 :param archive_prune_limit: the archive size above which pruning will
69 be performed
70 :param log_file: the optional log file
71 :param rand_seed: the optional random seed
72 :param max_fes: the maximum permitted function evaluations
73 :param max_time_millis: the maximum runtime in milliseconds
74 :param goal_f: the goal objective value. if it is reached, the process
75 is terminated
76 """
77 _ProcessBase.__init__(
78 self, solution_space, objective, algorithm, log_file, rand_seed,
79 max_fes, max_time_millis, goal_f)
80 self.f_dimension = objective.f_dimension # type: ignore
81 self.f_create = objective.f_create # type: ignore
82 self.f_validate = objective.f_validate # type: ignore
83 self.f_dtype = objective.f_dtype # type: ignore
84 self.f_dominates = objective.f_dominates # type: ignore
86 #: the internal evaluation function
87 self._f_evaluate: Final[Callable[
88 [Any, np.ndarray], int | float]] = objective.f_evaluate
90 #: the temporary variable for objective function evaluations
91 self._fs_temp: Final[np.ndarray] = self.f_create()
92 #: the holder for the objective vector of the current best solution
93 self._current_best_fs: Final[np.ndarray] = self.f_create()
94 #: the internal archive pruner
95 self._pruner: Final[MOArchivePruner] = check_mo_archive_pruner(pruner)
96 #: the fast call to the pruning routine
97 self._prune: Final[Callable[[list[MORecord], int, int], None]] \
98 = pruner.prune
99 #: the archive prune limit
100 self._archive_prune_limit: Final[int] = check_int_range(
101 archive_prune_limit, "archive_prune_limit")
102 #: the maximum archive size
103 self._archive_max_size: Final[int] = check_int_range(
104 archive_max_size, "archive_max_size", 1, archive_prune_limit)
105 #: the current archive size
106 self._archive_size: int = 0
107 #: the internal archive (pre-allocated to the prune limit)
108 self._archive: Final[list[MORecord]] = []
110 def _after_init(self) -> None:
111 self._archive.extend(
112 MORecord(self.create(), self.f_create())
113 for _ in range(self._archive_prune_limit))
114 super()._after_init()
116 def check_in(self, x: Any, fs: np.ndarray,
117 prune_if_necessary: bool = False) -> bool:
118 """
119 Check a solution into the archive.
121 :param x: the point in the search space
122 :param fs: the vector of objective values
123 :param prune_if_necessary: should we prune the archive if it becomes
124 too large? `False` means that the archive may grow unbounded
125 :returns: `True` if the solution was non-dominated, `False` if it was
126 dominated by at least one solution in the archive
127 """
128 archive: Final[list[MORecord]] = self._archive
129 added_to_archive: bool = False
130 archive_size: int = self._archive_size
131 # we update the archive
132 domination: Final[Callable[[np.ndarray, np.ndarray], int]] \
133 = self.f_dominates
134 for i in range(archive_size - 1, -1, -1):
135 ae: MORecord = archive[i]
136 d: int = domination(fs, ae.fs)
137 if d < 0: # the new solution dominates an archived one
138 if added_to_archive: # if already added, shrink archive
139 archive_size -= 1
140 archive[archive_size], archive[i] = \
141 ae, archive[archive_size]
142 else: # if not added, overwrite dominated solution
143 self.copy(ae.x, x)
144 copyto(ae.fs, fs)
145 added_to_archive = True
146 elif d > 0:
147 return False
149 if added_to_archive: # already added, can quit
150 self._archive_size = archive_size
151 else: # still need to add
152 if archive_size >= len(archive):
153 ae = MORecord(self.create(), self.f_create())
154 archive.append(ae)
155 else:
156 ae = archive[archive_size]
157 self.copy(ae.x, x)
158 copyto(ae.fs, fs)
159 archive_size += 1
160 if prune_if_necessary \
161 and (archive_size > self._archive_prune_limit):
162 self._prune(archive, self._archive_max_size, archive_size)
163 self._archive_size = self._archive_max_size
164 else:
165 self._archive_size = archive_size
166 return True
168 def f_evaluate(self, x, fs: np.ndarray) -> float | int:
169 if self._terminated:
170 if self._knows_that_terminated:
171 raise ValueError("The process has been terminated and "
172 "the algorithm knows it.")
173 return self._current_best_f
175 result: Final[int | float] = self._f_evaluate(x, fs)
176 self._current_fes = current_fes = self._current_fes + 1
177 do_term: bool = current_fes >= self._end_fes
179 improved: bool = False
180 if result < self._current_best_f:
181 improved = True
182 self._current_best_f = result
183 copyto(self._current_best_fs, fs)
184 self._copy_y(self._current_best_y, x)
185 do_term = do_term or (result <= self._end_f)
187 if self.check_in(x, fs, True) or improved:
188 self._last_improvement_fe = current_fes
189 self._current_time_nanos = ctn = _TIME_IN_NS()
190 self._last_improvement_time_nanos = ctn
192 if do_term:
193 self.terminate()
195 return result
197 def evaluate(self, x) -> float | int:
198 return self.f_evaluate(x, self._fs_temp)
200 def register(self, x, f: int | float) -> None:
201 raise ValueError(
202 "register is not supported in multi-objective optimization")
204 def get_archive(self) -> list[MORecord]:
205 return self._archive[0:self._archive_size]
207 def get_copy_of_best_fs(self, fs: np.ndarray) -> None:
208 if self._current_fes > 0:
209 return copyto(fs, self._current_best_fs)
210 raise ValueError("No current best available.")
212 def _log_own_parameters(self, logger: KeyValueLogSection) -> None:
213 super()._log_own_parameters(logger)
214 logger.key_value(KEY_ARCHIVE_MAX_SIZE, self._archive_max_size)
215 logger.key_value(KEY_ARCHIVE_PRUNE_LIMIT, self._archive_prune_limit)
217 def log_parameters_to(self, logger: KeyValueLogSection) -> None:
218 _ProcessBase.log_parameters_to(self, logger)
219 with logger.scope(SCOPE_PRUNER) as sc:
220 self._pruner.log_parameters_to(sc)
222 def _log_best(self, kv: KeyValueLogSection) -> None:
223 super()._log_best(kv)
224 kv.key_value(KEY_BEST_FS, array_to_str(self._current_best_fs))
225 kv.key_value(KEY_ARCHIVE_SIZE, self._archive_size)
227 def _log_and_check_archive_entry(self, index: int, rec: MORecord,
228 logger: Logger) -> int | float:
229 """
230 Write an archive entry.
232 :param index: the index of the entry
233 :param rec: the record to verify
234 :param logger: the logger
235 :returns: the objective value
236 """
237 self.f_validate(rec.fs)
238 self.validate(rec.x)
239 tfs: Final[np.ndarray] = self._fs_temp
240 f: Final[int | float] = self._f_evaluate(rec.x, tfs)
241 if not np.array_equal(tfs, rec.fs):
242 raise ValueError(
243 f"expected {rec.fs} but got {tfs} when re-evaluating {rec}")
244 if not isinstance(f, int | float):
245 raise type_error(f, "scalarized objective value", (int, float))
246 if not isfinite(f):
247 raise ValueError(f"scalarized objective value {f} is not finite")
249 with logger.text(f"{PREFIX_SECTION_ARCHIVE}{index}"
250 f"{SUFFIX_SECTION_ARCHIVE_Y}") as lg:
251 lg.write(self.to_str(rec.x))
252 return f
254 def _write_log(self, logger: Logger) -> None:
255 super()._write_log(logger)
257 if self._archive_size > 0:
258 # write and verify the archive
259 archive: Final[list[MORecord]] = \
260 self._archive[0:self._archive_size]
261 archive.sort()
262 qualities: Final[list[list[int | float]]] = []
263 for i, rec in enumerate(archive):
264 q: list[int | float] = [
265 np_to_py_number(n) for n in rec.fs]
266 q.insert(0, self._log_and_check_archive_entry(i, rec, logger))
267 qualities.append(q)
269 # now write the qualities
270 headline: list[str] = [
271 f"{KEY_ARCHIVE_F}{i}" for i in range(self.f_dimension())]
272 headline.insert(0, KEY_ARCHIVE_F)
273 with logger.csv(SECTION_ARCHIVE_QUALITY, headline) as csv:
274 for qq in qualities:
275 csv.row(qq)
277 def _write_mo_log(self,
278 log: list[list[int | float | np.ndarray]],
279 start_time: int,
280 keep_all: bool,
281 logger: Logger) -> None:
282 """
283 Write the multi-objective log to the logger.
285 :param log: the log
286 :param start_time: the start time
287 :param keep_all: do we need to keep all entries?
288 :param logger: the destination logger
289 """
290 loglen = len(log)
291 if loglen <= 0:
292 return
294 domination: Final[Callable[[np.ndarray, np.ndarray], int]] \
295 = self.f_dominates
297 if not keep_all:
298 # first we clean the log from potentially dominated entries
299 for i in range(loglen - 1, 0, -1):
300 reci = log[i]
301 fi = cast("int | float", reci[2])
302 fsi = cast("np.ndarray", reci[3])
303 for j in range(i - 1, -1, -1):
304 recj = log[j]
305 fj = cast("int | float", recj[2])
306 fsj = cast("np.ndarray", recj[3])
307 if (fj <= fi) and (domination(fsi, fsj) > 0):
308 del log[i]
309 break
311 header: list[str] = [PROGRESS_FES, PROGRESS_TIME_MILLIS,
312 PROGRESS_CURRENT_F]
313 header.extend(f"{PROGRESS_CURRENT_F}{i}" for i in range(
314 len(cast("np.ndarray", log[0])[3])))
316 with logger.csv(SECTION_PROGRESS, header) as csv:
317 for row in log:
318 srow = [row[0], _ns_to_ms(cast("int", row[1])
319 - start_time), row[2]]
320 srow.extend([np_to_py_number(n)
321 for n in cast("np.ndarray", row[3])])
322 csv.row(srow)
324 def __str__(self) -> str:
325 return "MOProcessWithoutSearchSpace"