Coverage for moptipy / evaluation / ert.py: 68%

193 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-24 08:49 +0000

1""" 

2Approximate the expected running time to reach certain goals. 

3 

4The (empirically estimated) Expected Running Time (ERT) tries to give an 

5impression of how long an algorithm needs to reach a certain solution quality. 

6 

7The ERT for a problem instance is estimated as the ratio of the sum of all FEs 

8that all the runs consumed until they either have discovered a solution of a 

9given goal quality or exhausted their budget, divided by the number of runs 

10that discovered a solution of the goal quality. The ERT is the mean expect 

11runtime under the assumption of independent restarts after failed runs, which 

12then may either succeed (consuming the mean runtime of the successful runs) or 

13fail again (with the observed failure probability, after consuming the 

14available budget). 

15 

16The ERT itself can be considered as a function that associates the estimated 

17runtime given above to all possible solution qualities that can be attained by 

18an algorithm for a give problem. For qualities/goals that an algorithm did not 

19attain in any run, the ERT becomes infinite. 

20 

211. Kenneth V. Price. Differential Evolution vs. The Functions of the 2nd ICEO. 

22 In Russ Eberhart, Peter Angeline, Thomas Back, Zbigniew Michalewicz, and 

23 Xin Yao, editors, *IEEE International Conference on Evolutionary 

24 Computation,* April 13-16, 1997, Indianapolis, IN, USA, pages 153-157. 

25 IEEE Computational Intelligence Society. ISBN: 0-7803-3949-5. 

26 doi: https://doi.org/10.1109/ICEC.1997.592287 

272. Nikolaus Hansen, Anne Auger, Steffen Finck, Raymond Ros. *Real-Parameter 

28 Black-Box Optimization Benchmarking 2010: Experimental Setup.* 

29 Research Report RR-7215, INRIA. 2010. inria-00462481. 

30 https://hal.inria.fr/inria-00462481/document/ 

31""" 

32 

33from dataclasses import dataclass 

34from math import inf, isfinite 

35from typing import Any, Callable, Final, Iterable, cast 

36 

37import numpy as np 

38from pycommons.io.console import logger 

39from pycommons.io.csv import COMMENT_START, CSV_SEPARATOR 

40from pycommons.io.path import Path 

41from pycommons.strings.string_conv import num_to_str 

42from pycommons.types import type_error 

43 

44from moptipy.api.logging import ( 

45 KEY_ALGORITHM, 

46 KEY_INSTANCE, 

47) 

48from moptipy.evaluation._utils import _get_goal_reach_index 

49from moptipy.evaluation.base import ( 

50 F_NAME_NORMALIZED, 

51 F_NAME_RAW, 

52 F_NAME_SCALED, 

53 KEY_ENCODING, 

54 KEY_N, 

55 KEY_OBJECTIVE_FUNCTION, 

56 MultiRun2DData, 

57) 

58from moptipy.evaluation.progress import Progress 

59from moptipy.utils.logger import ( 

60 KEY_VALUE_SEPARATOR, 

61) 

62from moptipy.utils.nputils import DEFAULT_FLOAT, DEFAULT_INT, is_all_finite 

63 

64 

65def compute_single_ert(source: Iterable[Progress], 

66 goal_f: int | float) -> float: 

67 """ 

68 Compute a single ERT. 

69 

70 The ERT is the sum of the time that the runs spend with a 

71 best-so-far quality greater or equal than `goal_f` divided by the 

72 number of runs that reached `goal_f`. The idea is that the 

73 unsuccessful runs spent their complete computational budget and 

74 once they have terminated, we would immediately start a new, 

75 independent run. 

76 

77 Warning: `source` must only contain progress objects that contain 

78 monotonously improving points. It must not contain runs that may get 

79 worse over time. 

80 

81 :param source: the source array 

82 :param goal_f: the goal objective value 

83 :return: the ERT 

84 

85 >>> from moptipy.evaluation.progress import Progress as Pr 

86 >>> from numpy import array as a 

87 >>> f = "plainF" 

88 >>> t = "FEs" 

89 >>> r = [Pr("a", "i", "f", "e", 1, a([1, 4, 8]), t, a([10, 8, 5]), f), 

90 ... Pr("a", "i", "f", "e", 2, a([1, 3, 6]), t, a([9, 7, 4]), f), 

91 ... Pr("a", "i", "f", "e", 3, a([1, 2, 7, 9]), t, a([8, 7, 6, 3]), f), 

92 ... Pr("a", "i", "f", "e", 4, a([1, 12]), t, a([9, 3]), f)] 

93 >>> print(compute_single_ert(r, 11)) 

94 1.0 

95 >>> print(compute_single_ert(r, 10)) 

96 1.0 

97 >>> print(compute_single_ert(r, 9.5)) # (4 + 1 + 1 + 1) / 4 = 1.75 

98 1.75 

99 >>> print(compute_single_ert(r, 9)) # (4 + 1 + 1 + 1) / 4 = 1.75 

100 1.75 

101 >>> print(compute_single_ert(r, 8.5)) # (4 + 3 + 1 + 12) / 4 = 5 

102 5.0 

103 >>> print(compute_single_ert(r, 8)) # (4 + 3 + 1 + 12) / 4 = 5 

104 5.0 

105 >>> print(compute_single_ert(r, 7.3)) # (8 + 3 + 2 + 12) / 4 = 6.25 

106 6.25 

107 >>> print(compute_single_ert(r, 7)) # (8 + 3 + 2 + 12) / 4 = 6.25 

108 6.25 

109 >>> print(compute_single_ert(r, 6.1)) # (8 + 6 + 7 + 12) / 4 = 8.25 

110 8.25 

111 >>> print(compute_single_ert(r, 6)) # (8 + 6 + 7 + 12) / 4 = 8.25 

112 8.25 

113 >>> print(compute_single_ert(r, 5.7)) # (8 + 6 + 9 + 12) / 4 = 8.75 

114 8.75 

115 >>> print(compute_single_ert(r, 5)) # (8 + 6 + 9 + 12) / 4 = 8.75 

116 8.75 

117 >>> print(compute_single_ert(r, 4.2)) # (8 + 6 + 9 + 12) / 3 = 11.666... 

118 11.666666666666666 

119 >>> print(compute_single_ert(r, 4)) # (8 + 6 + 9 + 12) / 3 = 11.666... 

120 11.666666666666666 

121 >>> print(compute_single_ert(r, 3.8)) # (8 + 6 + 9 + 12) / 2 = 17.5 

122 17.5 

123 >>> print(compute_single_ert(r, 3)) # (8 + 6 + 9 + 12) / 2 = 17.5 

124 17.5 

125 >>> print(compute_single_ert(r, 2.9)) 

126 inf 

127 >>> print(compute_single_ert(r, 2)) 

128 inf 

129 """ 

130 n_success: int = 0 

131 time_sum: int = 0 

132 for progress in source: 

133 idx = _get_goal_reach_index(progress.f, goal_f) 

134 if idx >= 0: 

135 n_success += 1 

136 else: 

137 idx = cast("np.integer", -1) 

138 time_sum += int(progress.time[idx]) 

139 if time_sum <= 0: 

140 raise ValueError(f"Time sum cannot be {time_sum}.") 

141 if n_success <= 0: 

142 return inf 

143 return time_sum / n_success 

144 

145 

146@dataclass(frozen=True, init=False, order=False, eq=False) 

147class Ert(MultiRun2DData): 

148 """Estimate the Expected Running Time (ERT).""" 

149 

150 #: The ert function 

151 ert: np.ndarray 

152 

153 def __init__(self, 

154 algorithm: str | None, 

155 instance: str | None, 

156 objective: str | None, 

157 encoding: str | None, 

158 n: int, 

159 time_unit: str, 

160 f_name: str, 

161 ert: np.ndarray): 

162 """ 

163 Create the Ert function. 

164 

165 :param algorithm: the algorithm name, if all runs are with the same 

166 algorithm 

167 :param instance: the instance name, if all runs are on the same 

168 instance 

169 :param objective: the objective name, if all runs are on the same 

170 objective function, `None` otherwise 

171 :param encoding: the encoding name, if all runs are on the same 

172 encoding and an encoding was actually used, `None` otherwise 

173 :param n: the total number of runs 

174 :param time_unit: the time unit 

175 :param f_name: the objective dimension name 

176 :param ert: the ert matrix 

177 """ 

178 super().__init__(algorithm, instance, objective, encoding, n, 

179 time_unit, f_name) 

180 if not isinstance(ert, np.ndarray): 

181 raise type_error(ert, "ert", np.ndarray) 

182 ert.flags.writeable = False 

183 

184 f: Final[np.ndarray] = ert[:, 0] 

185 if not is_all_finite(f): 

186 raise ValueError( 

187 f"Ert x-axis must be all finite, but encountered {f}.") 

188 ll = f.size 

189 if (ll > 1) and np.any(f[1:] <= f[:-1]): 

190 raise ValueError("f data must be strictly increasing," 

191 f"but encountered {f}.") 

192 

193 t: Final[np.ndarray] = ert[:, 1] 

194 if np.isfinite(t[0]) or (np.isposinf(t[0])): 

195 if not is_all_finite(t[1:]): 

196 raise ValueError( 

197 "non-first ert y-axis elements must be all finite.") 

198 if np.any(t[1:] >= t[:-1]): 

199 raise ValueError("t data must be strictly decreasing," 

200 f"but encountered {t}.") 

201 else: 

202 raise ValueError( 

203 f"first ert y-axis element cannot be {t[0]}.") 

204 object.__setattr__(self, "ert", ert) 

205 

206 

207def to_csv(ert: Ert, file: str, 

208 put_header: bool = True) -> Path: 

209 """ 

210 Store a :class:`Ert` record in a CSV file. 

211 

212 :param ert: the ERT 

213 :param file: the file to generate 

214 :param put_header: should we put a header with meta-data? 

215 :return: the fully resolved file name 

216 """ 

217 if not isinstance(ert, Ert): 

218 raise type_error(ert, "ert", Ert) 

219 path: Final[Path] = Path(file) 

220 logger(f"Writing ERT to CSV file {path!r}.") 

221 path.ensure_parent_dir_exists() 

222 

223 with path.open_for_write() as out: 

224 sep: Final[str] = CSV_SEPARATOR 

225 write: Callable[[str], int] = out.write 

226 if put_header: 

227 kv: Final[str] = KEY_VALUE_SEPARATOR 

228 cmt: Final[str] = COMMENT_START 

229 if ert.algorithm is not None: 

230 write(f"{cmt} {KEY_ALGORITHM}{kv}{ert.algorithm}\n") 

231 if ert.instance is not None: 

232 write(f"{cmt} {KEY_INSTANCE}{kv}{ert.instance}\n") 

233 if ert.objective is not None: 

234 write(f"{cmt} {KEY_OBJECTIVE_FUNCTION}{kv}" 

235 f"{ert.objective}\n") 

236 if ert.encoding is not None: 

237 write(f"{cmt} {KEY_ENCODING}{kv}{ert.encoding}\n") 

238 write(f"{cmt} {KEY_N}{kv}{ert.n}\n") 

239 write(f"{ert.f_name}{sep}ert[{ert.time_unit}]\n") 

240 for v in ert.ert: 

241 write(f"{num_to_str(v[0])}{sep}{num_to_str(v[1])}\n") 

242 

243 logger(f"Done writing ERT to CSV file {path!r}.") 

244 path.enforce_file() 

245 return path 

246 

247 

248def create(source: Iterable[Progress], 

249 f_lower_bound: int | float | Callable | None = None, 

250 use_default_lower_bounds: bool = True) -> Ert: 

251 """ 

252 Create one single Ert record from an iterable of Progress records. 

253 

254 :param source: the set of progress instances 

255 :param f_lower_bound: the lower bound for the objective value, or a 

256 callable that is applied to a progress object to get the lower 

257 bound 

258 :param use_default_lower_bounds: should we use the default lower 

259 bounds 

260 :return: the Ert record 

261 """ 

262 if not isinstance(source, Iterable): 

263 raise type_error(source, "source", Iterable) 

264 

265 lower_bound: int | float = inf 

266 if (f_lower_bound is not None) and (not callable(f_lower_bound)): 

267 if not isfinite(f_lower_bound): 

268 raise ValueError("f_lower_bound must be finite " 

269 f"but is {f_lower_bound}.") 

270 lower_bound = f_lower_bound 

271 f_lower_bound = None 

272 

273 algorithm: str | None = None 

274 instance: str | None = None 

275 objective: str | None = None 

276 encoding: str | None = None 

277 time_unit: str | None = None 

278 f_name: str | None = None 

279 f_list: list[np.ndarray] = [] 

280 n: int = 0 

281 

282 prgs: Final[list[Progress]] = cast("list[Progress]", source) \ 

283 if isinstance(source, list) else list(source) 

284 

285 for progress in prgs: 

286 if not isinstance(progress, Progress): 

287 raise type_error(progress, "progress input", Progress) 

288 if n <= 0: 

289 algorithm = progress.algorithm 

290 instance = progress.instance 

291 objective = progress.objective 

292 encoding = progress.encoding 

293 time_unit = progress.time_unit 

294 f_name = progress.f_name 

295 else: 

296 if algorithm != progress.algorithm: 

297 algorithm = None 

298 if instance != progress.instance: 

299 instance = None 

300 if objective != progress.objective: 

301 objective = None 

302 if encoding != progress.encoding: 

303 encoding = None 

304 if time_unit != progress.time_unit: 

305 raise ValueError( 

306 f"Cannot mix time units {time_unit} " 

307 f"and {progress.time_unit}.") 

308 if f_name != progress.f_name: 

309 raise ValueError(f"Cannot mix f-names {f_name} " 

310 f"and {progress.f_name}.") 

311 n += 1 

312 if use_default_lower_bounds and \ 

313 (progress.f_standard is not None) and \ 

314 (progress.f_name == F_NAME_RAW) and \ 

315 (lower_bound > progress.f_standard): 

316 lower_bound = progress.f_standard 

317 if f_lower_bound is not None: 

318 lb = f_lower_bound(progress) 

319 if not isinstance(lb, int | float): 

320 raise type_error(lb, "computed lower bound", (int, float)) 

321 if not isfinite(lb): 

322 raise ValueError(f"Invalid computed lower bound {lb}.") 

323 lower_bound = min(lower_bound, lb) 

324 f_list.append(progress.f) 

325 

326 if n <= 0: 

327 raise ValueError("Did not encounter any progress information.") 

328 

329 if use_default_lower_bounds: 

330 if f_name == F_NAME_SCALED: 

331 lower_bound = min(lower_bound, 1) 

332 elif f_name == F_NAME_NORMALIZED: 

333 lower_bound = min(lower_bound, 0) 

334 

335 # get unique x-values and make sure that lower bound is included 

336 has_lb: Final[bool] = isfinite(lower_bound) 

337 if has_lb: 

338 f_list.append(np.array([lower_bound])) 

339 x = np.concatenate(f_list) 

340 del f_list 

341 x = np.unique(x) 

342 if has_lb: 

343 x = x[x >= lower_bound] 

344 base_len: Final[int] = x.size 

345 

346 # prepare for backward iteration over arrays 

347 indices = np.array([(ppr.f.size - 1) for ppr in prgs], 

348 dtype=DEFAULT_INT) 

349 y = np.empty(base_len, dtype=DEFAULT_FLOAT) 

350 

351 for out_index in range(base_len): 

352 f_lim = x[out_index] 

353 found = 0 

354 time_sum: int = 0 

355 for r_idx, pr in enumerate(prgs): 

356 idx = indices[r_idx] 

357 f_vals = pr.f 

358 # did we fulfill the limit? 

359 if f_vals[idx] <= f_lim: 

360 found += 1 

361 # can we move time cursor back? 

362 if (idx > 0) and (f_vals[idx - 1] <= f_lim): 

363 idx -= 1 

364 indices[r_idx] = idx 

365 else: 

366 # condition not fulfilled, need to use maximum time 

367 idx = -1 

368 time_sum += int(pr.time[idx]) 

369 

370 # compute ert value: infinite if no run meets condition 

371 y[out_index] = inf if (found <= 0) else time_sum / found 

372 

373 # convert the two arrays into one matrix 

374 ert = np.concatenate((x, y)).reshape((base_len, 2), order="F") 

375 

376 return Ert(algorithm, instance, objective, encoding, n, 

377 time_unit, f_name, ert) 

378 

379 

380def from_progresses(source: Iterable[Progress], 

381 consumer: Callable[[Ert], Any], 

382 f_lower_bound: float | None = None, 

383 use_default_lower_bounds: bool = True, 

384 join_all_algorithms: bool = False, 

385 join_all_instances: bool = False, 

386 join_all_objectives: bool = False, 

387 join_all_encodings: bool = False) -> None: 

388 """ 

389 Compute one or multiple ERTs from a stream of end results. 

390 

391 :param source: the set of progress instances 

392 :param f_lower_bound: the lower bound for the objective value 

393 :param use_default_lower_bounds: should we use the default lower 

394 bounds 

395 :param consumer: the destination to which the new records will be 

396 passed, can be the `append` method of a :class:`list` 

397 :param join_all_algorithms: should the Ert be aggregated over all 

398 algorithms 

399 :param join_all_instances: should the Ert be aggregated over all 

400 algorithms 

401 :param join_all_objectives: should the statistics be aggregated over 

402 all objective functions? 

403 :param join_all_encodings: should the statistics be aggregated over 

404 all encodings? 

405 """ 

406 if not isinstance(source, Iterable): 

407 raise type_error(source, "source", Iterable) 

408 if not callable(consumer): 

409 raise type_error(consumer, "consumer", call=True) 

410 if not isinstance(join_all_algorithms, bool): 

411 raise type_error(join_all_algorithms, "join_all_algorithms", bool) 

412 if not isinstance(join_all_instances, bool): 

413 raise type_error(join_all_instances, "join_all_instances", bool) 

414 

415 if (join_all_algorithms and join_all_instances and join_all_objectives 

416 and join_all_encodings): 

417 consumer(create(source, f_lower_bound, use_default_lower_bounds)) 

418 return 

419 

420 sorter: dict[tuple[str, str, str, str], list[Progress]] = {} 

421 for er in source: 

422 if not isinstance(er, Progress): 

423 raise type_error(er, "progress source", Progress) 

424 key = ("" if join_all_algorithms else er.algorithm, 

425 "" if join_all_instances else er.instance, 

426 "" if join_all_objectives else er.objective, 

427 "" if join_all_encodings else ( 

428 "" if er.encoding is None else er.encoding)) 

429 if key in sorter: 

430 lst = sorter[key] 

431 else: 

432 lst = [] 

433 sorter[key] = lst 

434 lst.append(er) 

435 

436 if len(sorter) <= 0: 

437 raise ValueError("source must not be empty") 

438 

439 if len(sorter) > 1: 

440 keyz = sorted(sorter.keys()) 

441 for key in keyz: 

442 consumer(create(sorter[key], f_lower_bound, 

443 use_default_lower_bounds)) 

444 else: 

445 consumer(create(next(iter(sorter.values())), f_lower_bound, 

446 use_default_lower_bounds))