Coverage for moptipy / tests / mo_algorithm.py: 87%
146 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-24 08:49 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-24 08:49 +0000
1"""Functions that can be used to test multi-objective algorithms."""
2from math import inf, isfinite
3from typing import Any, Final
5import numpy as np
6from numpy import array_equal
7from numpy.random import Generator, default_rng
8from pycommons.types import check_int_range, type_error
10from moptipy.api.algorithm import (
11 Algorithm0,
12 Algorithm1,
13 Algorithm2,
14 check_algorithm,
15)
16from moptipy.api.encoding import Encoding
17from moptipy.api.mo_algorithm import MOAlgorithm
18from moptipy.api.mo_archive import MOArchivePruner
19from moptipy.api.mo_execution import MOExecution
20from moptipy.api.mo_problem import MOProblem
21from moptipy.api.operators import check_op0, check_op1, check_op2
22from moptipy.api.space import Space
23from moptipy.mo.archive.keep_farthest import KeepFarthest
24from moptipy.tests.component import validate_component
25from moptipy.tests.encoding import validate_encoding
26from moptipy.tests.mo_problem import validate_mo_problem
27from moptipy.tests.space import validate_space
28from moptipy.utils.nputils import rand_seed_generate
31def validate_mo_algorithm(
32 algorithm: MOAlgorithm,
33 solution_space: Space,
34 problem: MOProblem,
35 search_space: Space | None = None,
36 encoding: Encoding | None = None,
37 max_fes: int = 100,
38 is_encoding_deterministic: bool = True) -> None:
39 """
40 Check whether a multi-objective algorithm follows the moptipy API.
42 :param algorithm: the algorithm to test
43 :param solution_space: the solution space
44 :param problem: the problem to solve
45 :param search_space: the optional search space
46 :param encoding: the optional encoding
47 :param max_fes: the maximum number of FEs
48 :param is_encoding_deterministic: is the encoding deterministic?
49 :raises TypeError: if `algorithm` is not a
50 :class:`~moptipy.api.mo_algorithm.MOAlgorithm` instance
51 :raises ValueError: if `algorithm` does not behave like it should
52 """
53 if not isinstance(algorithm, MOAlgorithm):
54 raise type_error(algorithm, "algorithm", MOAlgorithm)
56 check_algorithm(algorithm)
57 if isinstance(algorithm, Algorithm0):
58 check_op0(algorithm.op0)
59 if isinstance(algorithm, Algorithm1):
60 check_op1(algorithm.op1)
61 if isinstance(algorithm, Algorithm2):
62 check_op2(algorithm.op2)
64 validate_component(algorithm)
65 validate_mo_problem(problem, None, None)
66 validate_space(solution_space, None)
68 if encoding is not None:
69 validate_encoding(encoding, None, None, None,
70 is_encoding_deterministic)
71 validate_space(search_space, None)
73 check_int_range(max_fes, "max_fes", 1, 1_000_000_000)
74 lb: Final[int | float] = problem.lower_bound()
75 if (not isfinite(lb)) and (lb != -inf):
76 raise ValueError(f"objective lower bound cannot be {lb}.")
77 ub = problem.upper_bound()
78 if (not isfinite(ub)) and (ub != inf):
79 raise ValueError(f"objective upper bound cannot be {ub}.")
81 exp = MOExecution()
82 exp.set_algorithm(algorithm)
83 exp.set_max_fes(max_fes)
84 exp.set_solution_space(solution_space)
85 exp.set_objective(problem)
86 if search_space is not None:
87 exp.set_search_space(search_space)
88 exp.set_encoding(encoding)
90 random: Final[Generator] = default_rng()
91 max_archive_size: Final[int] = int(random.integers(
92 1, 1 << int(random.integers(1, 6))))
93 exp.set_archive_max_size(max_archive_size)
94 exp.set_archive_pruning_limit(
95 max_archive_size + int(random.integers(0, 8)))
96 if random.integers(2) <= 0:
97 choice: int = int(random.integers(2))
98 pruner: MOArchivePruner
99 if choice <= 0:
100 lst: list[int]
101 while True:
102 lst = [i for i in range(problem.f_dimension())
103 if random.integers(2) <= 0]
104 if len(lst) > 0:
105 break
106 pruner = KeepFarthest(problem, lst)
107 else:
108 pruner = MOArchivePruner()
109 exp.set_archive_pruner(pruner)
111 seed: Final[int] = rand_seed_generate(random)
112 exp.set_rand_seed(seed)
114 l_consumed_fes: int = -1
115 l_last_improvement_fe: int = -1
116 l_res_f: int | float = inf
117 l_y: Any = None
118 l_fs: np.ndarray | None = None
119 l_x: Any = None
120 for is_check in [False, True]:
121 with exp.execute() as process:
122 # re-raise any exception that was caught
123 if hasattr(process, "_caught"):
124 error = getattr(process, "_caught")
125 if error is not None:
126 raise error
127 # no exception? ok, let's check the data
128 if not process.has_best():
129 raise ValueError(
130 "The algorithm did not produce any solution.")
132 if not process.should_terminate():
133 raise ValueError(
134 "The algorithm stopped before hitting the "
135 "termination criterion.")
136 consumed_fes: int = check_int_range(
137 process.get_consumed_fes(), "consumed_fes", 1, max_fes)
138 if is_check:
139 if consumed_fes != l_consumed_fes:
140 raise ValueError(
141 f"consumed FEs changed from {l_consumed_fes} to "
142 f"{consumed_fes} in second run for seed {seed}")
143 else:
144 l_consumed_fes = consumed_fes
145 last_improvement_fe = process.get_last_improvement_fe()
146 check_int_range(last_improvement_fe, "last_improvement_fe",
147 1, consumed_fes)
148 if is_check:
149 if last_improvement_fe != l_last_improvement_fe:
150 raise ValueError(
151 "last improvement FEs changed from "
152 f"{l_last_improvement_fe} to {last_improvement_fe} in"
153 f" second run for seed {seed}")
154 else:
155 l_last_improvement_fe = last_improvement_fe
157 consumed_time: int = check_int_range(
158 process.get_consumed_time_millis(), "consumed_time",
159 0, 100_0000_000)
160 check_int_range(
161 process.get_last_improvement_time_millis(),
162 "last_improvement_time", 0, consumed_time)
164 if lb != process.lower_bound():
165 raise ValueError(
166 "Inconsistent lower bounds between process "
167 f"({process.lower_bound()}) and scalarized "
168 f"objective ({lb}).")
169 if ub != process.upper_bound():
170 raise ValueError(
171 "Inconsistent upper bounds between process "
172 f"({process.upper_bound()}) and scalarized "
173 f"objective ({ub}).")
175 res_f: float | int = process.get_best_f()
176 if not isfinite(res_f):
177 raise ValueError(
178 "Infinite scalarized objective value of result.")
179 if (res_f < lb) or (res_f > ub):
180 raise ValueError(
181 f"Objective value {res_f} outside of bounds [{lb},{ub}].")
182 if is_check:
183 if res_f != l_res_f:
184 raise ValueError(
185 f"result f changed from {l_res_f} to {res_f} in"
186 f" second run for seed {seed}")
187 else:
188 l_res_f = res_f
190 y = solution_space.create()
191 process.get_copy_of_best_y(y)
192 solution_space.validate(y)
193 fs1 = problem.f_create()
194 fs2 = problem.f_create()
195 process.get_copy_of_best_y(y)
196 check_f = problem.f_evaluate(y, fs1)
197 if check_f != res_f:
198 raise ValueError(
199 f"Inconsistent objective value {res_f} from process "
200 f"compared to {check_f} from objective function.")
201 process.get_copy_of_best_fs(fs2)
202 if not array_equal(fs1, fs2):
203 raise ValueError(
204 f"Inconsistent objective vectors {fs1} and {fs2}.")
206 if is_check:
207 if not solution_space.is_equal(y, l_y):
208 raise ValueError(f"solution changed from {l_y} to {y} in "
209 f"the second run of seed {seed}")
210 if res_f != l_res_f:
211 raise ValueError(
212 f"result f changed from {l_res_f} to {res_f} in the "
213 f"second run of seed {seed}")
214 if not np.array_equal(fs1, l_fs):
215 raise ValueError(
216 f"result fs changed from {l_fs} to {fs1} in the "
217 f"second run of seed {seed}")
218 else:
219 l_y = y
220 l_fs = fs1
221 l_res_f = res_f
223 x: Any | None = None
224 if search_space is not None:
225 x = search_space.create()
226 process.get_copy_of_best_x(x)
227 search_space.validate(x)
229 if is_check:
230 if not search_space.is_equal(x, l_x):
231 raise ValueError(
232 f"result x changed from {l_x} to {x} in the "
233 f"second run of seed {seed}")
234 else:
235 l_x = x
237 if encoding is not None:
238 y2 = solution_space.create()
239 encoding.decode(x, y2)
240 solution_space.validate(y2)
241 if is_encoding_deterministic:
242 solution_space.is_equal(y, y2)