Coverage for moptipy / api / _mo_process_no_ss.py: 94%
177 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-29 10:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-29 10:36 +0000
1"""Providing a multi-objective process without logging with a single space."""
3from math import isfinite
4from typing import Any, Callable, Final, cast
6import numpy as np
7from numpy import copyto
8from pycommons.io.csv import CSV_SEPARATOR
9from pycommons.io.path import Path
10from pycommons.strings.string_conv import num_to_str
11from pycommons.types import check_int_range, type_error
13from moptipy.api._process_base import _TIME_IN_NS, _ns_to_ms, _ProcessBase
14from moptipy.api.algorithm import Algorithm
15from moptipy.api.improvement_logger import ImprovementLogger
16from moptipy.api.logging import (
17 KEY_ARCHIVE_F,
18 KEY_ARCHIVE_MAX_SIZE,
19 KEY_ARCHIVE_PRUNE_LIMIT,
20 KEY_ARCHIVE_SIZE,
21 KEY_BEST_FS,
22 KEY_CURRENT_F,
23 KEY_CURRENT_FS,
24 PREFIX_SECTION_ARCHIVE,
25 PROGRESS_CURRENT_F,
26 PROGRESS_FES,
27 PROGRESS_TIME_MILLIS,
28 SCOPE_PRUNER,
29 SECTION_ARCHIVE_QUALITY,
30 SECTION_PROGRESS,
31 SECTION_RESULT_X,
32 SECTION_RESULT_Y,
33 SUFFIX_SECTION_ARCHIVE_Y,
34)
35from moptipy.api.mo_archive import (
36 MOArchivePruner,
37 MORecord,
38 check_mo_archive_pruner,
39)
40from moptipy.api.mo_problem import MOProblem
41from moptipy.api.mo_process import MOProcess
42from moptipy.api.space import Space
43from moptipy.utils.logger import KeyValueLogSection, Logger
44from moptipy.utils.nputils import array_to_str, np_to_py_number
47class _MOProcessNoSS(MOProcess, _ProcessBase):
48 """
49 An internal class process implementation.
51 This class implements a stand-alone process without explicit logging where
52 the search and solution space are the same.
53 """
55 def __init__(self,
56 solution_space: Space,
57 objective: MOProblem,
58 algorithm: Algorithm,
59 pruner: MOArchivePruner,
60 archive_max_size: int,
61 archive_prune_limit: int,
62 log_file: Path | None = None,
63 rand_seed: int | None = None,
64 max_fes: int | None = None,
65 max_time_millis: int | None = None,
66 goal_f: int | float | None = None,
67 improvement_logger: ImprovementLogger | None = None) -> None:
68 """
69 Perform the internal initialization. Do not call directly.
71 :param solution_space: the search- and solution space.
72 :param objective: the objective function
73 :param algorithm: the optimization algorithm
74 :param pruner: the archive pruner
75 :param archive_max_size: the maximum archive size after pruning
76 :param archive_prune_limit: the archive size above which pruning will
77 be performed
78 :param log_file: the optional log file
79 :param rand_seed: the optional random seed
80 :param max_fes: the maximum permitted function evaluations
81 :param max_time_millis: the maximum runtime in milliseconds
82 :param goal_f: the goal objective value. if it is reached, the process
83 is terminated
84 :param improvement_logger: an improvement logger, whose
85 :meth:`~ImprovementLogger.log_improvement` method will be invoked
86 whenever the process has registered an improvement
87 """
88 _ProcessBase.__init__(
89 self,
90 solution_space=solution_space,
91 objective=objective,
92 algorithm=algorithm,
93 log_file=log_file,
94 rand_seed=rand_seed,
95 max_fes=max_fes,
96 max_time_millis=max_time_millis,
97 goal_f=goal_f,
98 improvement_logger=improvement_logger)
99 self.f_dimension = objective.f_dimension # type: ignore
100 self.f_create = objective.f_create # type: ignore
101 self.f_validate = objective.f_validate # type: ignore
102 self.f_dtype = objective.f_dtype # type: ignore
103 self.f_dominates = objective.f_dominates # type: ignore
105 #: the internal evaluation function
106 self._f_evaluate: Final[Callable[
107 [Any, np.ndarray], int | float]] = objective.f_evaluate
109 #: the temporary variable for objective function evaluations
110 self._fs_temp: Final[np.ndarray] = self.f_create()
111 #: the holder for the objective vector of the current best solution
112 self._current_best_fs: Final[np.ndarray] = self.f_create()
113 #: the internal archive pruner
114 self._pruner: Final[MOArchivePruner] = check_mo_archive_pruner(pruner)
115 #: the fast call to the pruning routine
116 self._prune: Final[Callable[[list[MORecord], int, int], None]] \
117 = pruner.prune
118 #: the archive prune limit
119 self._archive_prune_limit: Final[int] = check_int_range(
120 archive_prune_limit, "archive_prune_limit")
121 #: the maximum archive size
122 self._archive_max_size: Final[int] = check_int_range(
123 archive_max_size, "archive_max_size", 1, archive_prune_limit)
124 #: the current archive size
125 self._archive_size: int = 0
126 #: the internal archive (pre-allocated to the prune limit)
127 self._archive: Final[list[MORecord]] = []
129 def _after_init(self) -> None:
130 self._archive.extend(
131 MORecord(self.create(), self.f_create())
132 for _ in range(self._archive_prune_limit))
133 super()._after_init()
135 def check_in(self, x: Any, fs: np.ndarray,
136 prune_if_necessary: bool = False) -> bool:
137 """
138 Check a solution into the archive.
140 :param x: the point in the search space
141 :param fs: the vector of objective values
142 :param prune_if_necessary: should we prune the archive if it becomes
143 too large? `False` means that the archive may grow unbounded
144 :returns: `True` if the solution was non-dominated, `False` if it was
145 dominated by at least one solution in the archive
146 """
147 archive: Final[list[MORecord]] = self._archive
148 added_to_archive: bool = False
149 archive_size: int = self._archive_size
150 # we update the archive
151 domination: Final[Callable[[np.ndarray, np.ndarray], int]] \
152 = self.f_dominates
153 for i in range(archive_size - 1, -1, -1):
154 ae: MORecord = archive[i]
155 d: int = domination(fs, ae.fs)
156 if d < 0: # the new solution dominates an archived one
157 if added_to_archive: # if already added, shrink archive
158 archive_size -= 1
159 archive[archive_size], archive[i] = \
160 ae, archive[archive_size]
161 else: # if not added, overwrite dominated solution
162 self.copy(ae.x, x)
163 copyto(ae.fs, fs)
164 added_to_archive = True
165 elif d > 0:
166 return False
168 if added_to_archive: # already added, can quit
169 self._archive_size = archive_size
170 else: # still need to add
171 if archive_size >= len(archive):
172 ae = MORecord(self.create(), self.f_create())
173 archive.append(ae)
174 else:
175 ae = archive[archive_size]
176 self.copy(ae.x, x)
177 copyto(ae.fs, fs)
178 archive_size += 1
179 if prune_if_necessary \
180 and (archive_size > self._archive_prune_limit):
181 self._prune(archive, self._archive_max_size, archive_size)
182 self._archive_size = self._archive_max_size
183 else:
184 self._archive_size = archive_size
185 return True
187 def f_evaluate(self, x, fs: np.ndarray) -> float | int:
188 if self._terminated:
189 if self._knows_that_terminated:
190 raise ValueError("The process has been terminated and "
191 "the algorithm knows it.")
192 return self._current_best_f
194 result: Final[int | float] = self._f_evaluate(x, fs)
195 self._current_fes = current_fes = self._current_fes + 1
196 do_term: bool = current_fes >= self._end_fes
198 improved: bool = False
199 if result < self._current_best_f:
200 improved = True
201 self._current_best_f = result
202 copyto(self._current_best_fs, fs)
203 self._copy_y(self._current_best_y, x)
204 do_term = do_term or (result <= self._end_f)
206 if self.check_in(x, fs, True) or improved:
207 if self._log_improvement:
208 self._log_improvement(
209 cast("Callable[[Logger], None]",
210 lambda lg, _x=x, _f=result, _fs=fs:
211 self._write_improvement(lg, None, _x, _f, _fs)))
212 self._last_improvement_fe = current_fes
213 self._current_time_nanos = ctn = _TIME_IN_NS()
214 self._last_improvement_time_nanos = ctn
216 if do_term:
217 self.terminate()
219 return result
221 def evaluate(self, x) -> float | int:
222 return self.f_evaluate(x, self._fs_temp)
224 def register(self, x, f: int | float) -> None:
225 raise ValueError(
226 "register is not supported in multi-objective optimization")
228 def get_archive(self) -> list[MORecord]:
229 return self._archive[0:self._archive_size]
231 def get_copy_of_best_fs(self, fs: np.ndarray) -> None:
232 if self._current_fes > 0:
233 return copyto(fs, self._current_best_fs)
234 raise ValueError("No current best available.")
236 def _log_own_parameters(self, logger: KeyValueLogSection) -> None:
237 super()._log_own_parameters(logger)
238 logger.key_value(KEY_ARCHIVE_MAX_SIZE, self._archive_max_size)
239 logger.key_value(KEY_ARCHIVE_PRUNE_LIMIT, self._archive_prune_limit)
241 def log_parameters_to(self, logger: KeyValueLogSection) -> None:
242 _ProcessBase.log_parameters_to(self, logger)
243 with logger.scope(SCOPE_PRUNER) as sc:
244 self._pruner.log_parameters_to(sc)
246 def _log_best(self, kv: KeyValueLogSection) -> None:
247 super()._log_best(kv)
248 kv.key_value(KEY_BEST_FS, array_to_str(self._current_best_fs))
249 kv.key_value(KEY_ARCHIVE_SIZE, self._archive_size)
251 def _log_and_check_archive_entry(self, index: int, rec: MORecord,
252 logger: Logger) -> int | float:
253 """
254 Write an archive entry.
256 :param index: the index of the entry
257 :param rec: the record to verify
258 :param logger: the logger
259 :returns: the objective value
260 """
261 self.f_validate(rec.fs)
262 self.validate(rec.x)
263 tfs: Final[np.ndarray] = self._fs_temp
264 f: Final[int | float] = self._f_evaluate(rec.x, tfs)
265 if not np.array_equal(tfs, rec.fs):
266 raise ValueError(
267 f"expected {rec.fs} but got {tfs} when re-evaluating {rec}")
268 if not isinstance(f, int | float):
269 raise type_error(f, "scalarized objective value", (int, float))
270 if not isfinite(f):
271 raise ValueError(f"scalarized objective value {f} is not finite")
273 with logger.text(f"{PREFIX_SECTION_ARCHIVE}{index}"
274 f"{SUFFIX_SECTION_ARCHIVE_Y}") as lg:
275 lg.write(self.to_str(rec.x))
276 return f
278 def _write_log(self, logger: Logger) -> None:
279 super()._write_log(logger)
281 if self._archive_size > 0:
282 # write and verify the archive
283 archive: Final[list[MORecord]] = \
284 self._archive[0:self._archive_size]
285 archive.sort()
286 qualities: Final[list[list[int | float]]] = []
287 for i, rec in enumerate(archive):
288 q: list[int | float] = [
289 np_to_py_number(n) for n in rec.fs]
290 q.insert(0, self._log_and_check_archive_entry(i, rec, logger))
291 qualities.append(q)
293 # now write the qualities
294 headline: list[str] = [
295 f"{KEY_ARCHIVE_F}{i}" for i in range(self.f_dimension())]
296 headline.insert(0, KEY_ARCHIVE_F)
297 with logger.csv(SECTION_ARCHIVE_QUALITY, headline) as csv:
298 for qq in qualities:
299 csv.row(qq)
301 def _write_mo_log(self,
302 log: list[list[int | float | np.ndarray]],
303 start_time: int,
304 keep_all: bool,
305 logger: Logger) -> None:
306 """
307 Write the multi-objective log to the logger.
309 :param log: the log
310 :param start_time: the start time
311 :param keep_all: do we need to keep all entries?
312 :param logger: the destination logger
313 """
314 loglen = len(log)
315 if loglen <= 0:
316 return
318 domination: Final[Callable[[np.ndarray, np.ndarray], int]] \
319 = self.f_dominates
321 if not keep_all:
322 # first we clean the log from potentially dominated entries
323 for i in range(loglen - 1, 0, -1):
324 reci = log[i]
325 fi = cast("int | float", reci[2])
326 fsi = cast("np.ndarray", reci[3])
327 for j in range(i - 1, -1, -1):
328 recj = log[j]
329 fj = cast("int | float", recj[2])
330 fsj = cast("np.ndarray", recj[3])
331 if (fj <= fi) and (domination(fsi, fsj) > 0):
332 del log[i]
333 break
335 header: list[str] = [PROGRESS_FES, PROGRESS_TIME_MILLIS,
336 PROGRESS_CURRENT_F]
337 header.extend(f"{PROGRESS_CURRENT_F}{i}" for i in range(
338 len(cast("np.ndarray", log[0])[3])))
340 with logger.csv(SECTION_PROGRESS, header) as csv:
341 for row in log:
342 srow = [row[0], _ns_to_ms(cast("int", row[1])
343 - start_time), row[2]]
344 srow.extend([np_to_py_number(n)
345 for n in cast("np.ndarray", row[3])])
346 csv.row(srow)
348 def __str__(self) -> str:
349 return "MOProcessWithoutSearchSpace"
351 def _write_improvement(self, logger: Logger, x, y,
352 f: int | float, fs: np.ndarray) -> None:
353 """
354 Write an improvement to the logger.
356 :param logger: the logger
357 :param x: the point in the search space
358 :param y: the point in the solution space
359 :param f: the objective value
360 :param fs: the vector with the objective values
361 """
362 self._write_state_and_setup(
363 logger, ((KEY_CURRENT_F, num_to_str(f)),
364 (KEY_CURRENT_FS, CSV_SEPARATOR.join(map(
365 num_to_str, map(np_to_py_number, fs))))))
366 if x is not None:
367 with logger.text(SECTION_RESULT_X) as txt:
368 txt.write(x)
369 with logger.text(SECTION_RESULT_Y) as txt:
370 txt.write(self._solution_space.to_str(y))