Coverage for moptipy / api / _mo_process_no_ss.py: 93%

165 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-24 08:49 +0000

1"""Providing a multi-objective process without logging with a single space.""" 

2 

3from math import isfinite 

4from typing import Any, Callable, Final, cast 

5 

6import numpy as np 

7from numpy import copyto 

8from pycommons.io.path import Path 

9from pycommons.types import check_int_range, type_error 

10 

11from moptipy.api._process_base import _TIME_IN_NS, _ns_to_ms, _ProcessBase 

12from moptipy.api.algorithm import Algorithm 

13from moptipy.api.logging import ( 

14 KEY_ARCHIVE_F, 

15 KEY_ARCHIVE_MAX_SIZE, 

16 KEY_ARCHIVE_PRUNE_LIMIT, 

17 KEY_ARCHIVE_SIZE, 

18 KEY_BEST_FS, 

19 PREFIX_SECTION_ARCHIVE, 

20 PROGRESS_CURRENT_F, 

21 PROGRESS_FES, 

22 PROGRESS_TIME_MILLIS, 

23 SCOPE_PRUNER, 

24 SECTION_ARCHIVE_QUALITY, 

25 SECTION_PROGRESS, 

26 SUFFIX_SECTION_ARCHIVE_Y, 

27) 

28from moptipy.api.mo_archive import ( 

29 MOArchivePruner, 

30 MORecord, 

31 check_mo_archive_pruner, 

32) 

33from moptipy.api.mo_problem import MOProblem 

34from moptipy.api.mo_process import MOProcess 

35from moptipy.api.space import Space 

36from moptipy.utils.logger import KeyValueLogSection, Logger 

37from moptipy.utils.nputils import array_to_str, np_to_py_number 

38 

39 

40class _MOProcessNoSS(MOProcess, _ProcessBase): 

41 """ 

42 An internal class process implementation. 

43 

44 This class implements a stand-alone process without explicit logging where 

45 the search and solution space are the same. 

46 """ 

47 

48 def __init__(self, 

49 solution_space: Space, 

50 objective: MOProblem, 

51 algorithm: Algorithm, 

52 pruner: MOArchivePruner, 

53 archive_max_size: int, 

54 archive_prune_limit: int, 

55 log_file: Path | None = None, 

56 rand_seed: int | None = None, 

57 max_fes: int | None = None, 

58 max_time_millis: int | None = None, 

59 goal_f: int | float | None = None) -> None: 

60 """ 

61 Perform the internal initialization. Do not call directly. 

62 

63 :param solution_space: the search- and solution space. 

64 :param objective: the objective function 

65 :param algorithm: the optimization algorithm 

66 :param pruner: the archive pruner 

67 :param archive_max_size: the maximum archive size after pruning 

68 :param archive_prune_limit: the archive size above which pruning will 

69 be performed 

70 :param log_file: the optional log file 

71 :param rand_seed: the optional random seed 

72 :param max_fes: the maximum permitted function evaluations 

73 :param max_time_millis: the maximum runtime in milliseconds 

74 :param goal_f: the goal objective value. if it is reached, the process 

75 is terminated 

76 """ 

77 _ProcessBase.__init__( 

78 self, solution_space, objective, algorithm, log_file, rand_seed, 

79 max_fes, max_time_millis, goal_f) 

80 self.f_dimension = objective.f_dimension # type: ignore 

81 self.f_create = objective.f_create # type: ignore 

82 self.f_validate = objective.f_validate # type: ignore 

83 self.f_dtype = objective.f_dtype # type: ignore 

84 self.f_dominates = objective.f_dominates # type: ignore 

85 

86 #: the internal evaluation function 

87 self._f_evaluate: Final[Callable[ 

88 [Any, np.ndarray], int | float]] = objective.f_evaluate 

89 

90 #: the temporary variable for objective function evaluations 

91 self._fs_temp: Final[np.ndarray] = self.f_create() 

92 #: the holder for the objective vector of the current best solution 

93 self._current_best_fs: Final[np.ndarray] = self.f_create() 

94 #: the internal archive pruner 

95 self._pruner: Final[MOArchivePruner] = check_mo_archive_pruner(pruner) 

96 #: the fast call to the pruning routine 

97 self._prune: Final[Callable[[list[MORecord], int, int], None]] \ 

98 = pruner.prune 

99 #: the archive prune limit 

100 self._archive_prune_limit: Final[int] = check_int_range( 

101 archive_prune_limit, "archive_prune_limit") 

102 #: the maximum archive size 

103 self._archive_max_size: Final[int] = check_int_range( 

104 archive_max_size, "archive_max_size", 1, archive_prune_limit) 

105 #: the current archive size 

106 self._archive_size: int = 0 

107 #: the internal archive (pre-allocated to the prune limit) 

108 self._archive: Final[list[MORecord]] = [] 

109 

110 def _after_init(self) -> None: 

111 self._archive.extend( 

112 MORecord(self.create(), self.f_create()) 

113 for _ in range(self._archive_prune_limit)) 

114 super()._after_init() 

115 

116 def check_in(self, x: Any, fs: np.ndarray, 

117 prune_if_necessary: bool = False) -> bool: 

118 """ 

119 Check a solution into the archive. 

120 

121 :param x: the point in the search space 

122 :param fs: the vector of objective values 

123 :param prune_if_necessary: should we prune the archive if it becomes 

124 too large? `False` means that the archive may grow unbounded 

125 :returns: `True` if the solution was non-dominated, `False` if it was 

126 dominated by at least one solution in the archive 

127 """ 

128 archive: Final[list[MORecord]] = self._archive 

129 added_to_archive: bool = False 

130 archive_size: int = self._archive_size 

131 # we update the archive 

132 domination: Final[Callable[[np.ndarray, np.ndarray], int]] \ 

133 = self.f_dominates 

134 for i in range(archive_size - 1, -1, -1): 

135 ae: MORecord = archive[i] 

136 d: int = domination(fs, ae.fs) 

137 if d < 0: # the new solution dominates an archived one 

138 if added_to_archive: # if already added, shrink archive 

139 archive_size -= 1 

140 archive[archive_size], archive[i] = \ 

141 ae, archive[archive_size] 

142 else: # if not added, overwrite dominated solution 

143 self.copy(ae.x, x) 

144 copyto(ae.fs, fs) 

145 added_to_archive = True 

146 elif d > 0: 

147 return False 

148 

149 if added_to_archive: # already added, can quit 

150 self._archive_size = archive_size 

151 else: # still need to add 

152 if archive_size >= len(archive): 

153 ae = MORecord(self.create(), self.f_create()) 

154 archive.append(ae) 

155 else: 

156 ae = archive[archive_size] 

157 self.copy(ae.x, x) 

158 copyto(ae.fs, fs) 

159 archive_size += 1 

160 if prune_if_necessary \ 

161 and (archive_size > self._archive_prune_limit): 

162 self._prune(archive, self._archive_max_size, archive_size) 

163 self._archive_size = self._archive_max_size 

164 else: 

165 self._archive_size = archive_size 

166 return True 

167 

168 def f_evaluate(self, x, fs: np.ndarray) -> float | int: 

169 if self._terminated: 

170 if self._knows_that_terminated: 

171 raise ValueError("The process has been terminated and " 

172 "the algorithm knows it.") 

173 return self._current_best_f 

174 

175 result: Final[int | float] = self._f_evaluate(x, fs) 

176 self._current_fes = current_fes = self._current_fes + 1 

177 do_term: bool = current_fes >= self._end_fes 

178 

179 improved: bool = False 

180 if result < self._current_best_f: 

181 improved = True 

182 self._current_best_f = result 

183 copyto(self._current_best_fs, fs) 

184 self._copy_y(self._current_best_y, x) 

185 do_term = do_term or (result <= self._end_f) 

186 

187 if self.check_in(x, fs, True) or improved: 

188 self._last_improvement_fe = current_fes 

189 self._current_time_nanos = ctn = _TIME_IN_NS() 

190 self._last_improvement_time_nanos = ctn 

191 

192 if do_term: 

193 self.terminate() 

194 

195 return result 

196 

197 def evaluate(self, x) -> float | int: 

198 return self.f_evaluate(x, self._fs_temp) 

199 

200 def register(self, x, f: int | float) -> None: 

201 raise ValueError( 

202 "register is not supported in multi-objective optimization") 

203 

204 def get_archive(self) -> list[MORecord]: 

205 return self._archive[0:self._archive_size] 

206 

207 def get_copy_of_best_fs(self, fs: np.ndarray) -> None: 

208 if self._current_fes > 0: 

209 return copyto(fs, self._current_best_fs) 

210 raise ValueError("No current best available.") 

211 

212 def _log_own_parameters(self, logger: KeyValueLogSection) -> None: 

213 super()._log_own_parameters(logger) 

214 logger.key_value(KEY_ARCHIVE_MAX_SIZE, self._archive_max_size) 

215 logger.key_value(KEY_ARCHIVE_PRUNE_LIMIT, self._archive_prune_limit) 

216 

217 def log_parameters_to(self, logger: KeyValueLogSection) -> None: 

218 _ProcessBase.log_parameters_to(self, logger) 

219 with logger.scope(SCOPE_PRUNER) as sc: 

220 self._pruner.log_parameters_to(sc) 

221 

222 def _log_best(self, kv: KeyValueLogSection) -> None: 

223 super()._log_best(kv) 

224 kv.key_value(KEY_BEST_FS, array_to_str(self._current_best_fs)) 

225 kv.key_value(KEY_ARCHIVE_SIZE, self._archive_size) 

226 

227 def _log_and_check_archive_entry(self, index: int, rec: MORecord, 

228 logger: Logger) -> int | float: 

229 """ 

230 Write an archive entry. 

231 

232 :param index: the index of the entry 

233 :param rec: the record to verify 

234 :param logger: the logger 

235 :returns: the objective value 

236 """ 

237 self.f_validate(rec.fs) 

238 self.validate(rec.x) 

239 tfs: Final[np.ndarray] = self._fs_temp 

240 f: Final[int | float] = self._f_evaluate(rec.x, tfs) 

241 if not np.array_equal(tfs, rec.fs): 

242 raise ValueError( 

243 f"expected {rec.fs} but got {tfs} when re-evaluating {rec}") 

244 if not isinstance(f, int | float): 

245 raise type_error(f, "scalarized objective value", (int, float)) 

246 if not isfinite(f): 

247 raise ValueError(f"scalarized objective value {f} is not finite") 

248 

249 with logger.text(f"{PREFIX_SECTION_ARCHIVE}{index}" 

250 f"{SUFFIX_SECTION_ARCHIVE_Y}") as lg: 

251 lg.write(self.to_str(rec.x)) 

252 return f 

253 

254 def _write_log(self, logger: Logger) -> None: 

255 super()._write_log(logger) 

256 

257 if self._archive_size > 0: 

258 # write and verify the archive 

259 archive: Final[list[MORecord]] = \ 

260 self._archive[0:self._archive_size] 

261 archive.sort() 

262 qualities: Final[list[list[int | float]]] = [] 

263 for i, rec in enumerate(archive): 

264 q: list[int | float] = [ 

265 np_to_py_number(n) for n in rec.fs] 

266 q.insert(0, self._log_and_check_archive_entry(i, rec, logger)) 

267 qualities.append(q) 

268 

269 # now write the qualities 

270 headline: list[str] = [ 

271 f"{KEY_ARCHIVE_F}{i}" for i in range(self.f_dimension())] 

272 headline.insert(0, KEY_ARCHIVE_F) 

273 with logger.csv(SECTION_ARCHIVE_QUALITY, headline) as csv: 

274 for qq in qualities: 

275 csv.row(qq) 

276 

277 def _write_mo_log(self, 

278 log: list[list[int | float | np.ndarray]], 

279 start_time: int, 

280 keep_all: bool, 

281 logger: Logger) -> None: 

282 """ 

283 Write the multi-objective log to the logger. 

284 

285 :param log: the log 

286 :param start_time: the start time 

287 :param keep_all: do we need to keep all entries? 

288 :param logger: the destination logger 

289 """ 

290 loglen = len(log) 

291 if loglen <= 0: 

292 return 

293 

294 domination: Final[Callable[[np.ndarray, np.ndarray], int]] \ 

295 = self.f_dominates 

296 

297 if not keep_all: 

298 # first we clean the log from potentially dominated entries 

299 for i in range(loglen - 1, 0, -1): 

300 reci = log[i] 

301 fi = cast("int | float", reci[2]) 

302 fsi = cast("np.ndarray", reci[3]) 

303 for j in range(i - 1, -1, -1): 

304 recj = log[j] 

305 fj = cast("int | float", recj[2]) 

306 fsj = cast("np.ndarray", recj[3]) 

307 if (fj <= fi) and (domination(fsi, fsj) > 0): 

308 del log[i] 

309 break 

310 

311 header: list[str] = [PROGRESS_FES, PROGRESS_TIME_MILLIS, 

312 PROGRESS_CURRENT_F] 

313 header.extend(f"{PROGRESS_CURRENT_F}{i}" for i in range( 

314 len(cast("np.ndarray", log[0])[3]))) 

315 

316 with logger.csv(SECTION_PROGRESS, header) as csv: 

317 for row in log: 

318 srow = [row[0], _ns_to_ms(cast("int", row[1]) 

319 - start_time), row[2]] 

320 srow.extend([np_to_py_number(n) 

321 for n in cast("np.ndarray", row[3])]) 

322 csv.row(srow) 

323 

324 def __str__(self) -> str: 

325 return "MOProcessWithoutSearchSpace"