Coverage for moptipy / tests / algorithm.py: 82%

120 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-24 08:49 +0000

1"""Functions that can be used to test algorithm implementations.""" 

2from math import inf, isfinite 

3from typing import Any, Callable, Final 

4 

5from pycommons.types import check_int_range, type_error 

6 

7from moptipy.api.algorithm import ( 

8 Algorithm, 

9 Algorithm0, 

10 Algorithm1, 

11 Algorithm2, 

12 check_algorithm, 

13) 

14from moptipy.api.encoding import Encoding 

15from moptipy.api.execution import Execution 

16from moptipy.api.objective import Objective 

17from moptipy.api.operators import check_op0, check_op1, check_op2 

18from moptipy.api.space import Space 

19from moptipy.tests.component import validate_component 

20from moptipy.tests.encoding import validate_encoding 

21from moptipy.tests.objective import validate_objective 

22from moptipy.tests.space import validate_space 

23from moptipy.utils.nputils import rand_seed_generate 

24 

25 

26def validate_algorithm(algorithm: Algorithm, 

27 solution_space: Space, 

28 objective: Objective, 

29 search_space: Space | None = None, 

30 encoding: Encoding | None = None, 

31 max_fes: int = 100, 

32 required_result: int | float | None = None, 

33 uses_all_fes_if_goal_not_reached: bool = True, 

34 is_encoding_deterministic: bool = True, 

35 post: Callable[[Algorithm, int], Any] | None = None) \ 

36 -> None: 

37 """ 

38 Check whether an algorithm follows the moptipy API specification. 

39 

40 :param algorithm: the algorithm to test 

41 :param solution_space: the solution space 

42 :param objective: the objective function 

43 :param search_space: the optional search space 

44 :param encoding: the optional encoding 

45 :param max_fes: the maximum number of FEs 

46 :param required_result: the optional required result quality 

47 :param uses_all_fes_if_goal_not_reached: will the algorithm use all FEs 

48 unless it reaches the goal? 

49 :param is_encoding_deterministic: is the encoding deterministic? 

50 :param post: a check to run after each execution of the algorithm, 

51 receiving the algorithm and the number of consumed FEs as parameter 

52 :raises TypeError: if `algorithm` is not a 

53 :class:`~moptipy.api.algorithm.Algorithm` instance 

54 :raises ValueError: if `algorithm` does not behave like it should 

55 """ 

56 if not isinstance(algorithm, Algorithm): 

57 raise type_error(algorithm, "algorithm", Algorithm) 

58 if (post is not None) and (not callable(post)): 

59 raise type_error(post, "post", None, call=True) 

60 

61 check_algorithm(algorithm) 

62 if isinstance(algorithm, Algorithm0): 

63 check_op0(algorithm.op0) 

64 if isinstance(algorithm, Algorithm1): 

65 check_op1(algorithm.op1) 

66 if isinstance(algorithm, Algorithm2): 

67 check_op2(algorithm.op2) 

68 

69 validate_component(algorithm) 

70 validate_space(solution_space, None) 

71 validate_objective(objective, None, None) 

72 if encoding is not None: 

73 validate_encoding(encoding, None, None, None, 

74 is_encoding_deterministic) 

75 validate_space(search_space, None) 

76 

77 check_int_range(max_fes, "max_fes", 1, 1_000_000_000) 

78 

79 exp = Execution() 

80 exp.set_algorithm(algorithm) 

81 exp.set_max_fes(max_fes) 

82 exp.set_solution_space(solution_space) 

83 exp.set_objective(objective) 

84 seed: Final[int] = rand_seed_generate() 

85 if not isinstance(seed, int): 

86 raise type_error(seed, "seed", int) 

87 if not (0 <= seed <= 18446744073709551615): 

88 raise ValueError(f"invalid seed={seed}.") 

89 exp.set_rand_seed(seed) 

90 if search_space is not None: 

91 exp.set_search_space(search_space) 

92 exp.set_encoding(encoding) 

93 

94 lb: Final[int | float] = objective.lower_bound() 

95 if (not isfinite(lb)) and (lb != -inf): 

96 raise ValueError(f"objective lower bound cannot be {lb}" 

97 f" for {algorithm} on objective {objective}.") 

98 ub = objective.upper_bound() 

99 if (not isfinite(ub)) and (ub != inf): 

100 raise ValueError(f"objective upper bound cannot be {ub}" 

101 f" for {algorithm} on objective {objective}.") 

102 

103 if required_result is not None: 

104 if not (lb <= required_result <= ub): 

105 raise ValueError(f"required result must be in [{lb},{ub}], " 

106 f"for {algorithm} on {objective} but " 

107 f"is {required_result}") 

108 if (not isfinite(required_result)) and (required_result != -inf): 

109 raise ValueError(f"required_result must not be {required_result} " 

110 f"for {algorithm} on {objective}.") 

111 

112 progress: Final[tuple[list[int | float], list[int | float]]] = \ 

113 [], [] # the progrss lists 

114 evaluate: Final[Callable[[Any], int | float]] = objective.evaluate 

115 

116 for index in range(2 if is_encoding_deterministic else 1): 

117 

118 if is_encoding_deterministic: 

119 

120 def __k(xy, ii=index, ev=evaluate, pp=progress) -> int | float: 

121 rr = ev(xy) 

122 pp[ii].append(rr) 

123 return rr 

124 

125 objective.evaluate = __k # type: ignore 

126 

127 with exp.execute() as process: 

128 # re-raise any exception that was caught 

129 if hasattr(process, "_caught"): 

130 error = getattr(process, "_caught") 

131 if error is not None: 

132 raise error 

133 # no exception? ok, let's check the data 

134 if not process.has_best(): 

135 raise ValueError(f"The algorithm {algorithm} did not produce " 

136 f"any solution on {objective} and " 

137 f"seed {seed}.") 

138 

139 if (not process.should_terminate()) \ 

140 and uses_all_fes_if_goal_not_reached: 

141 raise ValueError(f"The algorithm {algorithm} stopped " 

142 f"before hitting the termination " 

143 f"criterion on {objective} and seed {seed}.") 

144 

145 consumed_fes: int = check_int_range( 

146 process.get_consumed_fes(), "consumed_fes", 1, max_fes) 

147 last_imp_fe: int = check_int_range( 

148 process.get_last_improvement_fe(), 

149 "last_improvement_fe", 1, consumed_fes) 

150 consumed_time: int = check_int_range( 

151 process.get_consumed_time_millis(), "consumed_time", 

152 0, 100_0000_000) 

153 last_imp_time: int = check_int_range( 

154 process.get_last_improvement_time_millis(), 

155 "last_improvement_time", 0, consumed_time) 

156 if lb != process.lower_bound(): 

157 raise ValueError( 

158 "Inconsistent lower bounds between process " 

159 f"({process.lower_bound()}) and objective ({lb})" 

160 f" for {algorithm} on {objective} and seed {seed}.") 

161 if ub != process.upper_bound(): 

162 raise ValueError( 

163 "Inconsistent upper bounds between process " 

164 f"({process.upper_bound()}) and objective ({ub}) " 

165 f" for {algorithm} on {objective} and seed {seed}.") 

166 

167 res_f: float | int = process.get_best_f() 

168 if not isfinite(res_f): 

169 raise ValueError(f"Infinite objective value of result " 

170 f"for {algorithm} on {objective}.") 

171 if (res_f < lb) or (res_f > ub): 

172 raise ValueError(f"Objective value {res_f} outside of bounds " 

173 f"[{lb},{ub}] for {algorithm} on " 

174 f"{objective} and seed {seed}.") 

175 

176 if (required_result is not None) and (res_f > required_result): 

177 raise ValueError( 

178 f"Algorithm {algorithm} should find solution of " 

179 f"quality {required_result} on {objective}, but got " 

180 f"one of {res_f} and seed {seed}.") 

181 

182 if res_f <= lb: 

183 if last_imp_fe != consumed_fes: 

184 raise ValueError( 

185 f"if result={res_f} is as good as lb={lb}, then " 

186 f"last_imp_fe={last_imp_fe} must equal" 

187 f" consumed_fe={consumed_fes} for {algorithm} on " 

188 f"{objective} and seed {seed}.") 

189 if (10_000 + (1.05 * last_imp_time)) < consumed_time: 

190 raise ValueError( 

191 f"if result={res_f} is as good as lb={lb}, then " 

192 f"last_imp_time={last_imp_time} must not be much less" 

193 f" than consumed_time={consumed_time} for " 

194 f"{algorithm} on {objective} and seed {seed}.") 

195 elif uses_all_fes_if_goal_not_reached \ 

196 and (consumed_fes != max_fes): 

197 raise ValueError( 

198 f"if result={res_f} is worse than lb={lb}, then " 

199 f"consumed_fes={consumed_fes} must equal " 

200 f"max_fes={max_fes} for {algorithm} on {objective}" 

201 f" and seed {seed}.") 

202 

203 y = solution_space.create() 

204 process.get_copy_of_best_y(y) 

205 solution_space.validate(y) 

206 check_f = objective.evaluate(y) 

207 if check_f != res_f: 

208 raise ValueError( 

209 f"Inconsistent objective value {res_f} from process " 

210 f"compared to {check_f} from objective function for " 

211 f"{algorithm} on {objective} and seed {seed}.") 

212 

213 x: Any | None = None 

214 if search_space is not None: 

215 x = search_space.create() 

216 process.get_copy_of_best_x(x) 

217 search_space.validate(x) 

218 

219 if encoding is not None: 

220 y2 = solution_space.create() 

221 encoding.decode(x, y2) 

222 solution_space.validate(y2) 

223 if is_encoding_deterministic \ 

224 and not solution_space.is_equal(y, y2): 

225 raise ValueError( 

226 f"error when mapping point in search space {x} to " 

227 f"solution {y2}, because it should be {y} for " 

228 f"{algorithm} on {objective} under " 

229 f"encoding {encoding} and seed {seed}") 

230 

231 if post is not None: 

232 post(algorithm, consumed_fes) 

233 

234 objective.evaluate = evaluate # type: ignore 

235 

236 if is_encoding_deterministic and (progress[0] != progress[1]): 

237 raise ValueError(f"when applying algorithm {algorithm} to " 

238 f"{objective} under encoding {encoding} twice " 

239 f"with the same seed {seed} did lead to different " 

240 f"runs!")