Coverage for moptipy / tests / algorithm.py: 82%
120 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-24 08:49 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-24 08:49 +0000
1"""Functions that can be used to test algorithm implementations."""
2from math import inf, isfinite
3from typing import Any, Callable, Final
5from pycommons.types import check_int_range, type_error
7from moptipy.api.algorithm import (
8 Algorithm,
9 Algorithm0,
10 Algorithm1,
11 Algorithm2,
12 check_algorithm,
13)
14from moptipy.api.encoding import Encoding
15from moptipy.api.execution import Execution
16from moptipy.api.objective import Objective
17from moptipy.api.operators import check_op0, check_op1, check_op2
18from moptipy.api.space import Space
19from moptipy.tests.component import validate_component
20from moptipy.tests.encoding import validate_encoding
21from moptipy.tests.objective import validate_objective
22from moptipy.tests.space import validate_space
23from moptipy.utils.nputils import rand_seed_generate
26def validate_algorithm(algorithm: Algorithm,
27 solution_space: Space,
28 objective: Objective,
29 search_space: Space | None = None,
30 encoding: Encoding | None = None,
31 max_fes: int = 100,
32 required_result: int | float | None = None,
33 uses_all_fes_if_goal_not_reached: bool = True,
34 is_encoding_deterministic: bool = True,
35 post: Callable[[Algorithm, int], Any] | None = None) \
36 -> None:
37 """
38 Check whether an algorithm follows the moptipy API specification.
40 :param algorithm: the algorithm to test
41 :param solution_space: the solution space
42 :param objective: the objective function
43 :param search_space: the optional search space
44 :param encoding: the optional encoding
45 :param max_fes: the maximum number of FEs
46 :param required_result: the optional required result quality
47 :param uses_all_fes_if_goal_not_reached: will the algorithm use all FEs
48 unless it reaches the goal?
49 :param is_encoding_deterministic: is the encoding deterministic?
50 :param post: a check to run after each execution of the algorithm,
51 receiving the algorithm and the number of consumed FEs as parameter
52 :raises TypeError: if `algorithm` is not a
53 :class:`~moptipy.api.algorithm.Algorithm` instance
54 :raises ValueError: if `algorithm` does not behave like it should
55 """
56 if not isinstance(algorithm, Algorithm):
57 raise type_error(algorithm, "algorithm", Algorithm)
58 if (post is not None) and (not callable(post)):
59 raise type_error(post, "post", None, call=True)
61 check_algorithm(algorithm)
62 if isinstance(algorithm, Algorithm0):
63 check_op0(algorithm.op0)
64 if isinstance(algorithm, Algorithm1):
65 check_op1(algorithm.op1)
66 if isinstance(algorithm, Algorithm2):
67 check_op2(algorithm.op2)
69 validate_component(algorithm)
70 validate_space(solution_space, None)
71 validate_objective(objective, None, None)
72 if encoding is not None:
73 validate_encoding(encoding, None, None, None,
74 is_encoding_deterministic)
75 validate_space(search_space, None)
77 check_int_range(max_fes, "max_fes", 1, 1_000_000_000)
79 exp = Execution()
80 exp.set_algorithm(algorithm)
81 exp.set_max_fes(max_fes)
82 exp.set_solution_space(solution_space)
83 exp.set_objective(objective)
84 seed: Final[int] = rand_seed_generate()
85 if not isinstance(seed, int):
86 raise type_error(seed, "seed", int)
87 if not (0 <= seed <= 18446744073709551615):
88 raise ValueError(f"invalid seed={seed}.")
89 exp.set_rand_seed(seed)
90 if search_space is not None:
91 exp.set_search_space(search_space)
92 exp.set_encoding(encoding)
94 lb: Final[int | float] = objective.lower_bound()
95 if (not isfinite(lb)) and (lb != -inf):
96 raise ValueError(f"objective lower bound cannot be {lb}"
97 f" for {algorithm} on objective {objective}.")
98 ub = objective.upper_bound()
99 if (not isfinite(ub)) and (ub != inf):
100 raise ValueError(f"objective upper bound cannot be {ub}"
101 f" for {algorithm} on objective {objective}.")
103 if required_result is not None:
104 if not (lb <= required_result <= ub):
105 raise ValueError(f"required result must be in [{lb},{ub}], "
106 f"for {algorithm} on {objective} but "
107 f"is {required_result}")
108 if (not isfinite(required_result)) and (required_result != -inf):
109 raise ValueError(f"required_result must not be {required_result} "
110 f"for {algorithm} on {objective}.")
112 progress: Final[tuple[list[int | float], list[int | float]]] = \
113 [], [] # the progrss lists
114 evaluate: Final[Callable[[Any], int | float]] = objective.evaluate
116 for index in range(2 if is_encoding_deterministic else 1):
118 if is_encoding_deterministic:
120 def __k(xy, ii=index, ev=evaluate, pp=progress) -> int | float:
121 rr = ev(xy)
122 pp[ii].append(rr)
123 return rr
125 objective.evaluate = __k # type: ignore
127 with exp.execute() as process:
128 # re-raise any exception that was caught
129 if hasattr(process, "_caught"):
130 error = getattr(process, "_caught")
131 if error is not None:
132 raise error
133 # no exception? ok, let's check the data
134 if not process.has_best():
135 raise ValueError(f"The algorithm {algorithm} did not produce "
136 f"any solution on {objective} and "
137 f"seed {seed}.")
139 if (not process.should_terminate()) \
140 and uses_all_fes_if_goal_not_reached:
141 raise ValueError(f"The algorithm {algorithm} stopped "
142 f"before hitting the termination "
143 f"criterion on {objective} and seed {seed}.")
145 consumed_fes: int = check_int_range(
146 process.get_consumed_fes(), "consumed_fes", 1, max_fes)
147 last_imp_fe: int = check_int_range(
148 process.get_last_improvement_fe(),
149 "last_improvement_fe", 1, consumed_fes)
150 consumed_time: int = check_int_range(
151 process.get_consumed_time_millis(), "consumed_time",
152 0, 100_0000_000)
153 last_imp_time: int = check_int_range(
154 process.get_last_improvement_time_millis(),
155 "last_improvement_time", 0, consumed_time)
156 if lb != process.lower_bound():
157 raise ValueError(
158 "Inconsistent lower bounds between process "
159 f"({process.lower_bound()}) and objective ({lb})"
160 f" for {algorithm} on {objective} and seed {seed}.")
161 if ub != process.upper_bound():
162 raise ValueError(
163 "Inconsistent upper bounds between process "
164 f"({process.upper_bound()}) and objective ({ub}) "
165 f" for {algorithm} on {objective} and seed {seed}.")
167 res_f: float | int = process.get_best_f()
168 if not isfinite(res_f):
169 raise ValueError(f"Infinite objective value of result "
170 f"for {algorithm} on {objective}.")
171 if (res_f < lb) or (res_f > ub):
172 raise ValueError(f"Objective value {res_f} outside of bounds "
173 f"[{lb},{ub}] for {algorithm} on "
174 f"{objective} and seed {seed}.")
176 if (required_result is not None) and (res_f > required_result):
177 raise ValueError(
178 f"Algorithm {algorithm} should find solution of "
179 f"quality {required_result} on {objective}, but got "
180 f"one of {res_f} and seed {seed}.")
182 if res_f <= lb:
183 if last_imp_fe != consumed_fes:
184 raise ValueError(
185 f"if result={res_f} is as good as lb={lb}, then "
186 f"last_imp_fe={last_imp_fe} must equal"
187 f" consumed_fe={consumed_fes} for {algorithm} on "
188 f"{objective} and seed {seed}.")
189 if (10_000 + (1.05 * last_imp_time)) < consumed_time:
190 raise ValueError(
191 f"if result={res_f} is as good as lb={lb}, then "
192 f"last_imp_time={last_imp_time} must not be much less"
193 f" than consumed_time={consumed_time} for "
194 f"{algorithm} on {objective} and seed {seed}.")
195 elif uses_all_fes_if_goal_not_reached \
196 and (consumed_fes != max_fes):
197 raise ValueError(
198 f"if result={res_f} is worse than lb={lb}, then "
199 f"consumed_fes={consumed_fes} must equal "
200 f"max_fes={max_fes} for {algorithm} on {objective}"
201 f" and seed {seed}.")
203 y = solution_space.create()
204 process.get_copy_of_best_y(y)
205 solution_space.validate(y)
206 check_f = objective.evaluate(y)
207 if check_f != res_f:
208 raise ValueError(
209 f"Inconsistent objective value {res_f} from process "
210 f"compared to {check_f} from objective function for "
211 f"{algorithm} on {objective} and seed {seed}.")
213 x: Any | None = None
214 if search_space is not None:
215 x = search_space.create()
216 process.get_copy_of_best_x(x)
217 search_space.validate(x)
219 if encoding is not None:
220 y2 = solution_space.create()
221 encoding.decode(x, y2)
222 solution_space.validate(y2)
223 if is_encoding_deterministic \
224 and not solution_space.is_equal(y, y2):
225 raise ValueError(
226 f"error when mapping point in search space {x} to "
227 f"solution {y2}, because it should be {y} for "
228 f"{algorithm} on {objective} under "
229 f"encoding {encoding} and seed {seed}")
231 if post is not None:
232 post(algorithm, consumed_fes)
234 objective.evaluate = evaluate # type: ignore
236 if is_encoding_deterministic and (progress[0] != progress[1]):
237 raise ValueError(f"when applying algorithm {algorithm} to "
238 f"{objective} under encoding {encoding} twice "
239 f"with the same seed {seed} did lead to different "
240 f"runs!")