Coverage for moptipy / api / _mo_process_no_ss.py: 94%

177 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-29 10:36 +0000

1"""Providing a multi-objective process without logging with a single space.""" 

2 

3from math import isfinite 

4from typing import Any, Callable, Final, cast 

5 

6import numpy as np 

7from numpy import copyto 

8from pycommons.io.csv import CSV_SEPARATOR 

9from pycommons.io.path import Path 

10from pycommons.strings.string_conv import num_to_str 

11from pycommons.types import check_int_range, type_error 

12 

13from moptipy.api._process_base import _TIME_IN_NS, _ns_to_ms, _ProcessBase 

14from moptipy.api.algorithm import Algorithm 

15from moptipy.api.improvement_logger import ImprovementLogger 

16from moptipy.api.logging import ( 

17 KEY_ARCHIVE_F, 

18 KEY_ARCHIVE_MAX_SIZE, 

19 KEY_ARCHIVE_PRUNE_LIMIT, 

20 KEY_ARCHIVE_SIZE, 

21 KEY_BEST_FS, 

22 KEY_CURRENT_F, 

23 KEY_CURRENT_FS, 

24 PREFIX_SECTION_ARCHIVE, 

25 PROGRESS_CURRENT_F, 

26 PROGRESS_FES, 

27 PROGRESS_TIME_MILLIS, 

28 SCOPE_PRUNER, 

29 SECTION_ARCHIVE_QUALITY, 

30 SECTION_PROGRESS, 

31 SECTION_RESULT_X, 

32 SECTION_RESULT_Y, 

33 SUFFIX_SECTION_ARCHIVE_Y, 

34) 

35from moptipy.api.mo_archive import ( 

36 MOArchivePruner, 

37 MORecord, 

38 check_mo_archive_pruner, 

39) 

40from moptipy.api.mo_problem import MOProblem 

41from moptipy.api.mo_process import MOProcess 

42from moptipy.api.space import Space 

43from moptipy.utils.logger import KeyValueLogSection, Logger 

44from moptipy.utils.nputils import array_to_str, np_to_py_number 

45 

46 

47class _MOProcessNoSS(MOProcess, _ProcessBase): 

48 """ 

49 An internal class process implementation. 

50 

51 This class implements a stand-alone process without explicit logging where 

52 the search and solution space are the same. 

53 """ 

54 

55 def __init__(self, 

56 solution_space: Space, 

57 objective: MOProblem, 

58 algorithm: Algorithm, 

59 pruner: MOArchivePruner, 

60 archive_max_size: int, 

61 archive_prune_limit: int, 

62 log_file: Path | None = None, 

63 rand_seed: int | None = None, 

64 max_fes: int | None = None, 

65 max_time_millis: int | None = None, 

66 goal_f: int | float | None = None, 

67 improvement_logger: ImprovementLogger | None = None) -> None: 

68 """ 

69 Perform the internal initialization. Do not call directly. 

70 

71 :param solution_space: the search- and solution space. 

72 :param objective: the objective function 

73 :param algorithm: the optimization algorithm 

74 :param pruner: the archive pruner 

75 :param archive_max_size: the maximum archive size after pruning 

76 :param archive_prune_limit: the archive size above which pruning will 

77 be performed 

78 :param log_file: the optional log file 

79 :param rand_seed: the optional random seed 

80 :param max_fes: the maximum permitted function evaluations 

81 :param max_time_millis: the maximum runtime in milliseconds 

82 :param goal_f: the goal objective value. if it is reached, the process 

83 is terminated 

84 :param improvement_logger: an improvement logger, whose 

85 :meth:`~ImprovementLogger.log_improvement` method will be invoked 

86 whenever the process has registered an improvement 

87 """ 

88 _ProcessBase.__init__( 

89 self, 

90 solution_space=solution_space, 

91 objective=objective, 

92 algorithm=algorithm, 

93 log_file=log_file, 

94 rand_seed=rand_seed, 

95 max_fes=max_fes, 

96 max_time_millis=max_time_millis, 

97 goal_f=goal_f, 

98 improvement_logger=improvement_logger) 

99 self.f_dimension = objective.f_dimension # type: ignore 

100 self.f_create = objective.f_create # type: ignore 

101 self.f_validate = objective.f_validate # type: ignore 

102 self.f_dtype = objective.f_dtype # type: ignore 

103 self.f_dominates = objective.f_dominates # type: ignore 

104 

105 #: the internal evaluation function 

106 self._f_evaluate: Final[Callable[ 

107 [Any, np.ndarray], int | float]] = objective.f_evaluate 

108 

109 #: the temporary variable for objective function evaluations 

110 self._fs_temp: Final[np.ndarray] = self.f_create() 

111 #: the holder for the objective vector of the current best solution 

112 self._current_best_fs: Final[np.ndarray] = self.f_create() 

113 #: the internal archive pruner 

114 self._pruner: Final[MOArchivePruner] = check_mo_archive_pruner(pruner) 

115 #: the fast call to the pruning routine 

116 self._prune: Final[Callable[[list[MORecord], int, int], None]] \ 

117 = pruner.prune 

118 #: the archive prune limit 

119 self._archive_prune_limit: Final[int] = check_int_range( 

120 archive_prune_limit, "archive_prune_limit") 

121 #: the maximum archive size 

122 self._archive_max_size: Final[int] = check_int_range( 

123 archive_max_size, "archive_max_size", 1, archive_prune_limit) 

124 #: the current archive size 

125 self._archive_size: int = 0 

126 #: the internal archive (pre-allocated to the prune limit) 

127 self._archive: Final[list[MORecord]] = [] 

128 

129 def _after_init(self) -> None: 

130 self._archive.extend( 

131 MORecord(self.create(), self.f_create()) 

132 for _ in range(self._archive_prune_limit)) 

133 super()._after_init() 

134 

135 def check_in(self, x: Any, fs: np.ndarray, 

136 prune_if_necessary: bool = False) -> bool: 

137 """ 

138 Check a solution into the archive. 

139 

140 :param x: the point in the search space 

141 :param fs: the vector of objective values 

142 :param prune_if_necessary: should we prune the archive if it becomes 

143 too large? `False` means that the archive may grow unbounded 

144 :returns: `True` if the solution was non-dominated, `False` if it was 

145 dominated by at least one solution in the archive 

146 """ 

147 archive: Final[list[MORecord]] = self._archive 

148 added_to_archive: bool = False 

149 archive_size: int = self._archive_size 

150 # we update the archive 

151 domination: Final[Callable[[np.ndarray, np.ndarray], int]] \ 

152 = self.f_dominates 

153 for i in range(archive_size - 1, -1, -1): 

154 ae: MORecord = archive[i] 

155 d: int = domination(fs, ae.fs) 

156 if d < 0: # the new solution dominates an archived one 

157 if added_to_archive: # if already added, shrink archive 

158 archive_size -= 1 

159 archive[archive_size], archive[i] = \ 

160 ae, archive[archive_size] 

161 else: # if not added, overwrite dominated solution 

162 self.copy(ae.x, x) 

163 copyto(ae.fs, fs) 

164 added_to_archive = True 

165 elif d > 0: 

166 return False 

167 

168 if added_to_archive: # already added, can quit 

169 self._archive_size = archive_size 

170 else: # still need to add 

171 if archive_size >= len(archive): 

172 ae = MORecord(self.create(), self.f_create()) 

173 archive.append(ae) 

174 else: 

175 ae = archive[archive_size] 

176 self.copy(ae.x, x) 

177 copyto(ae.fs, fs) 

178 archive_size += 1 

179 if prune_if_necessary \ 

180 and (archive_size > self._archive_prune_limit): 

181 self._prune(archive, self._archive_max_size, archive_size) 

182 self._archive_size = self._archive_max_size 

183 else: 

184 self._archive_size = archive_size 

185 return True 

186 

187 def f_evaluate(self, x, fs: np.ndarray) -> float | int: 

188 if self._terminated: 

189 if self._knows_that_terminated: 

190 raise ValueError("The process has been terminated and " 

191 "the algorithm knows it.") 

192 return self._current_best_f 

193 

194 result: Final[int | float] = self._f_evaluate(x, fs) 

195 self._current_fes = current_fes = self._current_fes + 1 

196 do_term: bool = current_fes >= self._end_fes 

197 

198 improved: bool = False 

199 if result < self._current_best_f: 

200 improved = True 

201 self._current_best_f = result 

202 copyto(self._current_best_fs, fs) 

203 self._copy_y(self._current_best_y, x) 

204 do_term = do_term or (result <= self._end_f) 

205 

206 if self.check_in(x, fs, True) or improved: 

207 if self._log_improvement: 

208 self._log_improvement( 

209 cast("Callable[[Logger], None]", 

210 lambda lg, _x=x, _f=result, _fs=fs: 

211 self._write_improvement(lg, None, _x, _f, _fs))) 

212 self._last_improvement_fe = current_fes 

213 self._current_time_nanos = ctn = _TIME_IN_NS() 

214 self._last_improvement_time_nanos = ctn 

215 

216 if do_term: 

217 self.terminate() 

218 

219 return result 

220 

221 def evaluate(self, x) -> float | int: 

222 return self.f_evaluate(x, self._fs_temp) 

223 

224 def register(self, x, f: int | float) -> None: 

225 raise ValueError( 

226 "register is not supported in multi-objective optimization") 

227 

228 def get_archive(self) -> list[MORecord]: 

229 return self._archive[0:self._archive_size] 

230 

231 def get_copy_of_best_fs(self, fs: np.ndarray) -> None: 

232 if self._current_fes > 0: 

233 return copyto(fs, self._current_best_fs) 

234 raise ValueError("No current best available.") 

235 

236 def _log_own_parameters(self, logger: KeyValueLogSection) -> None: 

237 super()._log_own_parameters(logger) 

238 logger.key_value(KEY_ARCHIVE_MAX_SIZE, self._archive_max_size) 

239 logger.key_value(KEY_ARCHIVE_PRUNE_LIMIT, self._archive_prune_limit) 

240 

241 def log_parameters_to(self, logger: KeyValueLogSection) -> None: 

242 _ProcessBase.log_parameters_to(self, logger) 

243 with logger.scope(SCOPE_PRUNER) as sc: 

244 self._pruner.log_parameters_to(sc) 

245 

246 def _log_best(self, kv: KeyValueLogSection) -> None: 

247 super()._log_best(kv) 

248 kv.key_value(KEY_BEST_FS, array_to_str(self._current_best_fs)) 

249 kv.key_value(KEY_ARCHIVE_SIZE, self._archive_size) 

250 

251 def _log_and_check_archive_entry(self, index: int, rec: MORecord, 

252 logger: Logger) -> int | float: 

253 """ 

254 Write an archive entry. 

255 

256 :param index: the index of the entry 

257 :param rec: the record to verify 

258 :param logger: the logger 

259 :returns: the objective value 

260 """ 

261 self.f_validate(rec.fs) 

262 self.validate(rec.x) 

263 tfs: Final[np.ndarray] = self._fs_temp 

264 f: Final[int | float] = self._f_evaluate(rec.x, tfs) 

265 if not np.array_equal(tfs, rec.fs): 

266 raise ValueError( 

267 f"expected {rec.fs} but got {tfs} when re-evaluating {rec}") 

268 if not isinstance(f, int | float): 

269 raise type_error(f, "scalarized objective value", (int, float)) 

270 if not isfinite(f): 

271 raise ValueError(f"scalarized objective value {f} is not finite") 

272 

273 with logger.text(f"{PREFIX_SECTION_ARCHIVE}{index}" 

274 f"{SUFFIX_SECTION_ARCHIVE_Y}") as lg: 

275 lg.write(self.to_str(rec.x)) 

276 return f 

277 

278 def _write_log(self, logger: Logger) -> None: 

279 super()._write_log(logger) 

280 

281 if self._archive_size > 0: 

282 # write and verify the archive 

283 archive: Final[list[MORecord]] = \ 

284 self._archive[0:self._archive_size] 

285 archive.sort() 

286 qualities: Final[list[list[int | float]]] = [] 

287 for i, rec in enumerate(archive): 

288 q: list[int | float] = [ 

289 np_to_py_number(n) for n in rec.fs] 

290 q.insert(0, self._log_and_check_archive_entry(i, rec, logger)) 

291 qualities.append(q) 

292 

293 # now write the qualities 

294 headline: list[str] = [ 

295 f"{KEY_ARCHIVE_F}{i}" for i in range(self.f_dimension())] 

296 headline.insert(0, KEY_ARCHIVE_F) 

297 with logger.csv(SECTION_ARCHIVE_QUALITY, headline) as csv: 

298 for qq in qualities: 

299 csv.row(qq) 

300 

301 def _write_mo_log(self, 

302 log: list[list[int | float | np.ndarray]], 

303 start_time: int, 

304 keep_all: bool, 

305 logger: Logger) -> None: 

306 """ 

307 Write the multi-objective log to the logger. 

308 

309 :param log: the log 

310 :param start_time: the start time 

311 :param keep_all: do we need to keep all entries? 

312 :param logger: the destination logger 

313 """ 

314 loglen = len(log) 

315 if loglen <= 0: 

316 return 

317 

318 domination: Final[Callable[[np.ndarray, np.ndarray], int]] \ 

319 = self.f_dominates 

320 

321 if not keep_all: 

322 # first we clean the log from potentially dominated entries 

323 for i in range(loglen - 1, 0, -1): 

324 reci = log[i] 

325 fi = cast("int | float", reci[2]) 

326 fsi = cast("np.ndarray", reci[3]) 

327 for j in range(i - 1, -1, -1): 

328 recj = log[j] 

329 fj = cast("int | float", recj[2]) 

330 fsj = cast("np.ndarray", recj[3]) 

331 if (fj <= fi) and (domination(fsi, fsj) > 0): 

332 del log[i] 

333 break 

334 

335 header: list[str] = [PROGRESS_FES, PROGRESS_TIME_MILLIS, 

336 PROGRESS_CURRENT_F] 

337 header.extend(f"{PROGRESS_CURRENT_F}{i}" for i in range( 

338 len(cast("np.ndarray", log[0])[3]))) 

339 

340 with logger.csv(SECTION_PROGRESS, header) as csv: 

341 for row in log: 

342 srow = [row[0], _ns_to_ms(cast("int", row[1]) 

343 - start_time), row[2]] 

344 srow.extend([np_to_py_number(n) 

345 for n in cast("np.ndarray", row[3])]) 

346 csv.row(srow) 

347 

348 def __str__(self) -> str: 

349 return "MOProcessWithoutSearchSpace" 

350 

351 def _write_improvement(self, logger: Logger, x, y, 

352 f: int | float, fs: np.ndarray) -> None: 

353 """ 

354 Write an improvement to the logger. 

355 

356 :param logger: the logger 

357 :param x: the point in the search space 

358 :param y: the point in the solution space 

359 :param f: the objective value 

360 :param fs: the vector with the objective values 

361 """ 

362 self._write_state_and_setup( 

363 logger, ((KEY_CURRENT_F, num_to_str(f)), 

364 (KEY_CURRENT_FS, CSV_SEPARATOR.join(map( 

365 num_to_str, map(np_to_py_number, fs)))))) 

366 if x is not None: 

367 with logger.text(SECTION_RESULT_X) as txt: 

368 txt.write(x) 

369 with logger.text(SECTION_RESULT_Y) as txt: 

370 txt.write(self._solution_space.to_str(y))