Coverage for moptipy / tests / mo_algorithm.py: 87%

146 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-24 08:49 +0000

1"""Functions that can be used to test multi-objective algorithms.""" 

2from math import inf, isfinite 

3from typing import Any, Final 

4 

5import numpy as np 

6from numpy import array_equal 

7from numpy.random import Generator, default_rng 

8from pycommons.types import check_int_range, type_error 

9 

10from moptipy.api.algorithm import ( 

11 Algorithm0, 

12 Algorithm1, 

13 Algorithm2, 

14 check_algorithm, 

15) 

16from moptipy.api.encoding import Encoding 

17from moptipy.api.mo_algorithm import MOAlgorithm 

18from moptipy.api.mo_archive import MOArchivePruner 

19from moptipy.api.mo_execution import MOExecution 

20from moptipy.api.mo_problem import MOProblem 

21from moptipy.api.operators import check_op0, check_op1, check_op2 

22from moptipy.api.space import Space 

23from moptipy.mo.archive.keep_farthest import KeepFarthest 

24from moptipy.tests.component import validate_component 

25from moptipy.tests.encoding import validate_encoding 

26from moptipy.tests.mo_problem import validate_mo_problem 

27from moptipy.tests.space import validate_space 

28from moptipy.utils.nputils import rand_seed_generate 

29 

30 

31def validate_mo_algorithm( 

32 algorithm: MOAlgorithm, 

33 solution_space: Space, 

34 problem: MOProblem, 

35 search_space: Space | None = None, 

36 encoding: Encoding | None = None, 

37 max_fes: int = 100, 

38 is_encoding_deterministic: bool = True) -> None: 

39 """ 

40 Check whether a multi-objective algorithm follows the moptipy API. 

41 

42 :param algorithm: the algorithm to test 

43 :param solution_space: the solution space 

44 :param problem: the problem to solve 

45 :param search_space: the optional search space 

46 :param encoding: the optional encoding 

47 :param max_fes: the maximum number of FEs 

48 :param is_encoding_deterministic: is the encoding deterministic? 

49 :raises TypeError: if `algorithm` is not a 

50 :class:`~moptipy.api.mo_algorithm.MOAlgorithm` instance 

51 :raises ValueError: if `algorithm` does not behave like it should 

52 """ 

53 if not isinstance(algorithm, MOAlgorithm): 

54 raise type_error(algorithm, "algorithm", MOAlgorithm) 

55 

56 check_algorithm(algorithm) 

57 if isinstance(algorithm, Algorithm0): 

58 check_op0(algorithm.op0) 

59 if isinstance(algorithm, Algorithm1): 

60 check_op1(algorithm.op1) 

61 if isinstance(algorithm, Algorithm2): 

62 check_op2(algorithm.op2) 

63 

64 validate_component(algorithm) 

65 validate_mo_problem(problem, None, None) 

66 validate_space(solution_space, None) 

67 

68 if encoding is not None: 

69 validate_encoding(encoding, None, None, None, 

70 is_encoding_deterministic) 

71 validate_space(search_space, None) 

72 

73 check_int_range(max_fes, "max_fes", 1, 1_000_000_000) 

74 lb: Final[int | float] = problem.lower_bound() 

75 if (not isfinite(lb)) and (lb != -inf): 

76 raise ValueError(f"objective lower bound cannot be {lb}.") 

77 ub = problem.upper_bound() 

78 if (not isfinite(ub)) and (ub != inf): 

79 raise ValueError(f"objective upper bound cannot be {ub}.") 

80 

81 exp = MOExecution() 

82 exp.set_algorithm(algorithm) 

83 exp.set_max_fes(max_fes) 

84 exp.set_solution_space(solution_space) 

85 exp.set_objective(problem) 

86 if search_space is not None: 

87 exp.set_search_space(search_space) 

88 exp.set_encoding(encoding) 

89 

90 random: Final[Generator] = default_rng() 

91 max_archive_size: Final[int] = int(random.integers( 

92 1, 1 << int(random.integers(1, 6)))) 

93 exp.set_archive_max_size(max_archive_size) 

94 exp.set_archive_pruning_limit( 

95 max_archive_size + int(random.integers(0, 8))) 

96 if random.integers(2) <= 0: 

97 choice: int = int(random.integers(2)) 

98 pruner: MOArchivePruner 

99 if choice <= 0: 

100 lst: list[int] 

101 while True: 

102 lst = [i for i in range(problem.f_dimension()) 

103 if random.integers(2) <= 0] 

104 if len(lst) > 0: 

105 break 

106 pruner = KeepFarthest(problem, lst) 

107 else: 

108 pruner = MOArchivePruner() 

109 exp.set_archive_pruner(pruner) 

110 

111 seed: Final[int] = rand_seed_generate(random) 

112 exp.set_rand_seed(seed) 

113 

114 l_consumed_fes: int = -1 

115 l_last_improvement_fe: int = -1 

116 l_res_f: int | float = inf 

117 l_y: Any = None 

118 l_fs: np.ndarray | None = None 

119 l_x: Any = None 

120 for is_check in [False, True]: 

121 with exp.execute() as process: 

122 # re-raise any exception that was caught 

123 if hasattr(process, "_caught"): 

124 error = getattr(process, "_caught") 

125 if error is not None: 

126 raise error 

127 # no exception? ok, let's check the data 

128 if not process.has_best(): 

129 raise ValueError( 

130 "The algorithm did not produce any solution.") 

131 

132 if not process.should_terminate(): 

133 raise ValueError( 

134 "The algorithm stopped before hitting the " 

135 "termination criterion.") 

136 consumed_fes: int = check_int_range( 

137 process.get_consumed_fes(), "consumed_fes", 1, max_fes) 

138 if is_check: 

139 if consumed_fes != l_consumed_fes: 

140 raise ValueError( 

141 f"consumed FEs changed from {l_consumed_fes} to " 

142 f"{consumed_fes} in second run for seed {seed}") 

143 else: 

144 l_consumed_fes = consumed_fes 

145 last_improvement_fe = process.get_last_improvement_fe() 

146 check_int_range(last_improvement_fe, "last_improvement_fe", 

147 1, consumed_fes) 

148 if is_check: 

149 if last_improvement_fe != l_last_improvement_fe: 

150 raise ValueError( 

151 "last improvement FEs changed from " 

152 f"{l_last_improvement_fe} to {last_improvement_fe} in" 

153 f" second run for seed {seed}") 

154 else: 

155 l_last_improvement_fe = last_improvement_fe 

156 

157 consumed_time: int = check_int_range( 

158 process.get_consumed_time_millis(), "consumed_time", 

159 0, 100_0000_000) 

160 check_int_range( 

161 process.get_last_improvement_time_millis(), 

162 "last_improvement_time", 0, consumed_time) 

163 

164 if lb != process.lower_bound(): 

165 raise ValueError( 

166 "Inconsistent lower bounds between process " 

167 f"({process.lower_bound()}) and scalarized " 

168 f"objective ({lb}).") 

169 if ub != process.upper_bound(): 

170 raise ValueError( 

171 "Inconsistent upper bounds between process " 

172 f"({process.upper_bound()}) and scalarized " 

173 f"objective ({ub}).") 

174 

175 res_f: float | int = process.get_best_f() 

176 if not isfinite(res_f): 

177 raise ValueError( 

178 "Infinite scalarized objective value of result.") 

179 if (res_f < lb) or (res_f > ub): 

180 raise ValueError( 

181 f"Objective value {res_f} outside of bounds [{lb},{ub}].") 

182 if is_check: 

183 if res_f != l_res_f: 

184 raise ValueError( 

185 f"result f changed from {l_res_f} to {res_f} in" 

186 f" second run for seed {seed}") 

187 else: 

188 l_res_f = res_f 

189 

190 y = solution_space.create() 

191 process.get_copy_of_best_y(y) 

192 solution_space.validate(y) 

193 fs1 = problem.f_create() 

194 fs2 = problem.f_create() 

195 process.get_copy_of_best_y(y) 

196 check_f = problem.f_evaluate(y, fs1) 

197 if check_f != res_f: 

198 raise ValueError( 

199 f"Inconsistent objective value {res_f} from process " 

200 f"compared to {check_f} from objective function.") 

201 process.get_copy_of_best_fs(fs2) 

202 if not array_equal(fs1, fs2): 

203 raise ValueError( 

204 f"Inconsistent objective vectors {fs1} and {fs2}.") 

205 

206 if is_check: 

207 if not solution_space.is_equal(y, l_y): 

208 raise ValueError(f"solution changed from {l_y} to {y} in " 

209 f"the second run of seed {seed}") 

210 if res_f != l_res_f: 

211 raise ValueError( 

212 f"result f changed from {l_res_f} to {res_f} in the " 

213 f"second run of seed {seed}") 

214 if not np.array_equal(fs1, l_fs): 

215 raise ValueError( 

216 f"result fs changed from {l_fs} to {fs1} in the " 

217 f"second run of seed {seed}") 

218 else: 

219 l_y = y 

220 l_fs = fs1 

221 l_res_f = res_f 

222 

223 x: Any | None = None 

224 if search_space is not None: 

225 x = search_space.create() 

226 process.get_copy_of_best_x(x) 

227 search_space.validate(x) 

228 

229 if is_check: 

230 if not search_space.is_equal(x, l_x): 

231 raise ValueError( 

232 f"result x changed from {l_x} to {x} in the " 

233 f"second run of seed {seed}") 

234 else: 

235 l_x = x 

236 

237 if encoding is not None: 

238 y2 = solution_space.create() 

239 encoding.decode(x, y2) 

240 solution_space.validate(y2) 

241 if is_encoding_deterministic: 

242 solution_space.is_equal(y, y2)