Coverage for moptipy / evaluation / ert.py: 68%
193 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-24 08:49 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-24 08:49 +0000
1"""
2Approximate the expected running time to reach certain goals.
4The (empirically estimated) Expected Running Time (ERT) tries to give an
5impression of how long an algorithm needs to reach a certain solution quality.
7The ERT for a problem instance is estimated as the ratio of the sum of all FEs
8that all the runs consumed until they either have discovered a solution of a
9given goal quality or exhausted their budget, divided by the number of runs
10that discovered a solution of the goal quality. The ERT is the mean expect
11runtime under the assumption of independent restarts after failed runs, which
12then may either succeed (consuming the mean runtime of the successful runs) or
13fail again (with the observed failure probability, after consuming the
14available budget).
16The ERT itself can be considered as a function that associates the estimated
17runtime given above to all possible solution qualities that can be attained by
18an algorithm for a give problem. For qualities/goals that an algorithm did not
19attain in any run, the ERT becomes infinite.
211. Kenneth V. Price. Differential Evolution vs. The Functions of the 2nd ICEO.
22 In Russ Eberhart, Peter Angeline, Thomas Back, Zbigniew Michalewicz, and
23 Xin Yao, editors, *IEEE International Conference on Evolutionary
24 Computation,* April 13-16, 1997, Indianapolis, IN, USA, pages 153-157.
25 IEEE Computational Intelligence Society. ISBN: 0-7803-3949-5.
26 doi: https://doi.org/10.1109/ICEC.1997.592287
272. Nikolaus Hansen, Anne Auger, Steffen Finck, Raymond Ros. *Real-Parameter
28 Black-Box Optimization Benchmarking 2010: Experimental Setup.*
29 Research Report RR-7215, INRIA. 2010. inria-00462481.
30 https://hal.inria.fr/inria-00462481/document/
31"""
33from dataclasses import dataclass
34from math import inf, isfinite
35from typing import Any, Callable, Final, Iterable, cast
37import numpy as np
38from pycommons.io.console import logger
39from pycommons.io.csv import COMMENT_START, CSV_SEPARATOR
40from pycommons.io.path import Path
41from pycommons.strings.string_conv import num_to_str
42from pycommons.types import type_error
44from moptipy.api.logging import (
45 KEY_ALGORITHM,
46 KEY_INSTANCE,
47)
48from moptipy.evaluation._utils import _get_goal_reach_index
49from moptipy.evaluation.base import (
50 F_NAME_NORMALIZED,
51 F_NAME_RAW,
52 F_NAME_SCALED,
53 KEY_ENCODING,
54 KEY_N,
55 KEY_OBJECTIVE_FUNCTION,
56 MultiRun2DData,
57)
58from moptipy.evaluation.progress import Progress
59from moptipy.utils.logger import (
60 KEY_VALUE_SEPARATOR,
61)
62from moptipy.utils.nputils import DEFAULT_FLOAT, DEFAULT_INT, is_all_finite
65def compute_single_ert(source: Iterable[Progress],
66 goal_f: int | float) -> float:
67 """
68 Compute a single ERT.
70 The ERT is the sum of the time that the runs spend with a
71 best-so-far quality greater or equal than `goal_f` divided by the
72 number of runs that reached `goal_f`. The idea is that the
73 unsuccessful runs spent their complete computational budget and
74 once they have terminated, we would immediately start a new,
75 independent run.
77 Warning: `source` must only contain progress objects that contain
78 monotonously improving points. It must not contain runs that may get
79 worse over time.
81 :param source: the source array
82 :param goal_f: the goal objective value
83 :return: the ERT
85 >>> from moptipy.evaluation.progress import Progress as Pr
86 >>> from numpy import array as a
87 >>> f = "plainF"
88 >>> t = "FEs"
89 >>> r = [Pr("a", "i", "f", "e", 1, a([1, 4, 8]), t, a([10, 8, 5]), f),
90 ... Pr("a", "i", "f", "e", 2, a([1, 3, 6]), t, a([9, 7, 4]), f),
91 ... Pr("a", "i", "f", "e", 3, a([1, 2, 7, 9]), t, a([8, 7, 6, 3]), f),
92 ... Pr("a", "i", "f", "e", 4, a([1, 12]), t, a([9, 3]), f)]
93 >>> print(compute_single_ert(r, 11))
94 1.0
95 >>> print(compute_single_ert(r, 10))
96 1.0
97 >>> print(compute_single_ert(r, 9.5)) # (4 + 1 + 1 + 1) / 4 = 1.75
98 1.75
99 >>> print(compute_single_ert(r, 9)) # (4 + 1 + 1 + 1) / 4 = 1.75
100 1.75
101 >>> print(compute_single_ert(r, 8.5)) # (4 + 3 + 1 + 12) / 4 = 5
102 5.0
103 >>> print(compute_single_ert(r, 8)) # (4 + 3 + 1 + 12) / 4 = 5
104 5.0
105 >>> print(compute_single_ert(r, 7.3)) # (8 + 3 + 2 + 12) / 4 = 6.25
106 6.25
107 >>> print(compute_single_ert(r, 7)) # (8 + 3 + 2 + 12) / 4 = 6.25
108 6.25
109 >>> print(compute_single_ert(r, 6.1)) # (8 + 6 + 7 + 12) / 4 = 8.25
110 8.25
111 >>> print(compute_single_ert(r, 6)) # (8 + 6 + 7 + 12) / 4 = 8.25
112 8.25
113 >>> print(compute_single_ert(r, 5.7)) # (8 + 6 + 9 + 12) / 4 = 8.75
114 8.75
115 >>> print(compute_single_ert(r, 5)) # (8 + 6 + 9 + 12) / 4 = 8.75
116 8.75
117 >>> print(compute_single_ert(r, 4.2)) # (8 + 6 + 9 + 12) / 3 = 11.666...
118 11.666666666666666
119 >>> print(compute_single_ert(r, 4)) # (8 + 6 + 9 + 12) / 3 = 11.666...
120 11.666666666666666
121 >>> print(compute_single_ert(r, 3.8)) # (8 + 6 + 9 + 12) / 2 = 17.5
122 17.5
123 >>> print(compute_single_ert(r, 3)) # (8 + 6 + 9 + 12) / 2 = 17.5
124 17.5
125 >>> print(compute_single_ert(r, 2.9))
126 inf
127 >>> print(compute_single_ert(r, 2))
128 inf
129 """
130 n_success: int = 0
131 time_sum: int = 0
132 for progress in source:
133 idx = _get_goal_reach_index(progress.f, goal_f)
134 if idx >= 0:
135 n_success += 1
136 else:
137 idx = cast("np.integer", -1)
138 time_sum += int(progress.time[idx])
139 if time_sum <= 0:
140 raise ValueError(f"Time sum cannot be {time_sum}.")
141 if n_success <= 0:
142 return inf
143 return time_sum / n_success
146@dataclass(frozen=True, init=False, order=False, eq=False)
147class Ert(MultiRun2DData):
148 """Estimate the Expected Running Time (ERT)."""
150 #: The ert function
151 ert: np.ndarray
153 def __init__(self,
154 algorithm: str | None,
155 instance: str | None,
156 objective: str | None,
157 encoding: str | None,
158 n: int,
159 time_unit: str,
160 f_name: str,
161 ert: np.ndarray):
162 """
163 Create the Ert function.
165 :param algorithm: the algorithm name, if all runs are with the same
166 algorithm
167 :param instance: the instance name, if all runs are on the same
168 instance
169 :param objective: the objective name, if all runs are on the same
170 objective function, `None` otherwise
171 :param encoding: the encoding name, if all runs are on the same
172 encoding and an encoding was actually used, `None` otherwise
173 :param n: the total number of runs
174 :param time_unit: the time unit
175 :param f_name: the objective dimension name
176 :param ert: the ert matrix
177 """
178 super().__init__(algorithm, instance, objective, encoding, n,
179 time_unit, f_name)
180 if not isinstance(ert, np.ndarray):
181 raise type_error(ert, "ert", np.ndarray)
182 ert.flags.writeable = False
184 f: Final[np.ndarray] = ert[:, 0]
185 if not is_all_finite(f):
186 raise ValueError(
187 f"Ert x-axis must be all finite, but encountered {f}.")
188 ll = f.size
189 if (ll > 1) and np.any(f[1:] <= f[:-1]):
190 raise ValueError("f data must be strictly increasing,"
191 f"but encountered {f}.")
193 t: Final[np.ndarray] = ert[:, 1]
194 if np.isfinite(t[0]) or (np.isposinf(t[0])):
195 if not is_all_finite(t[1:]):
196 raise ValueError(
197 "non-first ert y-axis elements must be all finite.")
198 if np.any(t[1:] >= t[:-1]):
199 raise ValueError("t data must be strictly decreasing,"
200 f"but encountered {t}.")
201 else:
202 raise ValueError(
203 f"first ert y-axis element cannot be {t[0]}.")
204 object.__setattr__(self, "ert", ert)
207def to_csv(ert: Ert, file: str,
208 put_header: bool = True) -> Path:
209 """
210 Store a :class:`Ert` record in a CSV file.
212 :param ert: the ERT
213 :param file: the file to generate
214 :param put_header: should we put a header with meta-data?
215 :return: the fully resolved file name
216 """
217 if not isinstance(ert, Ert):
218 raise type_error(ert, "ert", Ert)
219 path: Final[Path] = Path(file)
220 logger(f"Writing ERT to CSV file {path!r}.")
221 path.ensure_parent_dir_exists()
223 with path.open_for_write() as out:
224 sep: Final[str] = CSV_SEPARATOR
225 write: Callable[[str], int] = out.write
226 if put_header:
227 kv: Final[str] = KEY_VALUE_SEPARATOR
228 cmt: Final[str] = COMMENT_START
229 if ert.algorithm is not None:
230 write(f"{cmt} {KEY_ALGORITHM}{kv}{ert.algorithm}\n")
231 if ert.instance is not None:
232 write(f"{cmt} {KEY_INSTANCE}{kv}{ert.instance}\n")
233 if ert.objective is not None:
234 write(f"{cmt} {KEY_OBJECTIVE_FUNCTION}{kv}"
235 f"{ert.objective}\n")
236 if ert.encoding is not None:
237 write(f"{cmt} {KEY_ENCODING}{kv}{ert.encoding}\n")
238 write(f"{cmt} {KEY_N}{kv}{ert.n}\n")
239 write(f"{ert.f_name}{sep}ert[{ert.time_unit}]\n")
240 for v in ert.ert:
241 write(f"{num_to_str(v[0])}{sep}{num_to_str(v[1])}\n")
243 logger(f"Done writing ERT to CSV file {path!r}.")
244 path.enforce_file()
245 return path
248def create(source: Iterable[Progress],
249 f_lower_bound: int | float | Callable | None = None,
250 use_default_lower_bounds: bool = True) -> Ert:
251 """
252 Create one single Ert record from an iterable of Progress records.
254 :param source: the set of progress instances
255 :param f_lower_bound: the lower bound for the objective value, or a
256 callable that is applied to a progress object to get the lower
257 bound
258 :param use_default_lower_bounds: should we use the default lower
259 bounds
260 :return: the Ert record
261 """
262 if not isinstance(source, Iterable):
263 raise type_error(source, "source", Iterable)
265 lower_bound: int | float = inf
266 if (f_lower_bound is not None) and (not callable(f_lower_bound)):
267 if not isfinite(f_lower_bound):
268 raise ValueError("f_lower_bound must be finite "
269 f"but is {f_lower_bound}.")
270 lower_bound = f_lower_bound
271 f_lower_bound = None
273 algorithm: str | None = None
274 instance: str | None = None
275 objective: str | None = None
276 encoding: str | None = None
277 time_unit: str | None = None
278 f_name: str | None = None
279 f_list: list[np.ndarray] = []
280 n: int = 0
282 prgs: Final[list[Progress]] = cast("list[Progress]", source) \
283 if isinstance(source, list) else list(source)
285 for progress in prgs:
286 if not isinstance(progress, Progress):
287 raise type_error(progress, "progress input", Progress)
288 if n <= 0:
289 algorithm = progress.algorithm
290 instance = progress.instance
291 objective = progress.objective
292 encoding = progress.encoding
293 time_unit = progress.time_unit
294 f_name = progress.f_name
295 else:
296 if algorithm != progress.algorithm:
297 algorithm = None
298 if instance != progress.instance:
299 instance = None
300 if objective != progress.objective:
301 objective = None
302 if encoding != progress.encoding:
303 encoding = None
304 if time_unit != progress.time_unit:
305 raise ValueError(
306 f"Cannot mix time units {time_unit} "
307 f"and {progress.time_unit}.")
308 if f_name != progress.f_name:
309 raise ValueError(f"Cannot mix f-names {f_name} "
310 f"and {progress.f_name}.")
311 n += 1
312 if use_default_lower_bounds and \
313 (progress.f_standard is not None) and \
314 (progress.f_name == F_NAME_RAW) and \
315 (lower_bound > progress.f_standard):
316 lower_bound = progress.f_standard
317 if f_lower_bound is not None:
318 lb = f_lower_bound(progress)
319 if not isinstance(lb, int | float):
320 raise type_error(lb, "computed lower bound", (int, float))
321 if not isfinite(lb):
322 raise ValueError(f"Invalid computed lower bound {lb}.")
323 lower_bound = min(lower_bound, lb)
324 f_list.append(progress.f)
326 if n <= 0:
327 raise ValueError("Did not encounter any progress information.")
329 if use_default_lower_bounds:
330 if f_name == F_NAME_SCALED:
331 lower_bound = min(lower_bound, 1)
332 elif f_name == F_NAME_NORMALIZED:
333 lower_bound = min(lower_bound, 0)
335 # get unique x-values and make sure that lower bound is included
336 has_lb: Final[bool] = isfinite(lower_bound)
337 if has_lb:
338 f_list.append(np.array([lower_bound]))
339 x = np.concatenate(f_list)
340 del f_list
341 x = np.unique(x)
342 if has_lb:
343 x = x[x >= lower_bound]
344 base_len: Final[int] = x.size
346 # prepare for backward iteration over arrays
347 indices = np.array([(ppr.f.size - 1) for ppr in prgs],
348 dtype=DEFAULT_INT)
349 y = np.empty(base_len, dtype=DEFAULT_FLOAT)
351 for out_index in range(base_len):
352 f_lim = x[out_index]
353 found = 0
354 time_sum: int = 0
355 for r_idx, pr in enumerate(prgs):
356 idx = indices[r_idx]
357 f_vals = pr.f
358 # did we fulfill the limit?
359 if f_vals[idx] <= f_lim:
360 found += 1
361 # can we move time cursor back?
362 if (idx > 0) and (f_vals[idx - 1] <= f_lim):
363 idx -= 1
364 indices[r_idx] = idx
365 else:
366 # condition not fulfilled, need to use maximum time
367 idx = -1
368 time_sum += int(pr.time[idx])
370 # compute ert value: infinite if no run meets condition
371 y[out_index] = inf if (found <= 0) else time_sum / found
373 # convert the two arrays into one matrix
374 ert = np.concatenate((x, y)).reshape((base_len, 2), order="F")
376 return Ert(algorithm, instance, objective, encoding, n,
377 time_unit, f_name, ert)
380def from_progresses(source: Iterable[Progress],
381 consumer: Callable[[Ert], Any],
382 f_lower_bound: float | None = None,
383 use_default_lower_bounds: bool = True,
384 join_all_algorithms: bool = False,
385 join_all_instances: bool = False,
386 join_all_objectives: bool = False,
387 join_all_encodings: bool = False) -> None:
388 """
389 Compute one or multiple ERTs from a stream of end results.
391 :param source: the set of progress instances
392 :param f_lower_bound: the lower bound for the objective value
393 :param use_default_lower_bounds: should we use the default lower
394 bounds
395 :param consumer: the destination to which the new records will be
396 passed, can be the `append` method of a :class:`list`
397 :param join_all_algorithms: should the Ert be aggregated over all
398 algorithms
399 :param join_all_instances: should the Ert be aggregated over all
400 algorithms
401 :param join_all_objectives: should the statistics be aggregated over
402 all objective functions?
403 :param join_all_encodings: should the statistics be aggregated over
404 all encodings?
405 """
406 if not isinstance(source, Iterable):
407 raise type_error(source, "source", Iterable)
408 if not callable(consumer):
409 raise type_error(consumer, "consumer", call=True)
410 if not isinstance(join_all_algorithms, bool):
411 raise type_error(join_all_algorithms, "join_all_algorithms", bool)
412 if not isinstance(join_all_instances, bool):
413 raise type_error(join_all_instances, "join_all_instances", bool)
415 if (join_all_algorithms and join_all_instances and join_all_objectives
416 and join_all_encodings):
417 consumer(create(source, f_lower_bound, use_default_lower_bounds))
418 return
420 sorter: dict[tuple[str, str, str, str], list[Progress]] = {}
421 for er in source:
422 if not isinstance(er, Progress):
423 raise type_error(er, "progress source", Progress)
424 key = ("" if join_all_algorithms else er.algorithm,
425 "" if join_all_instances else er.instance,
426 "" if join_all_objectives else er.objective,
427 "" if join_all_encodings else (
428 "" if er.encoding is None else er.encoding))
429 if key in sorter:
430 lst = sorter[key]
431 else:
432 lst = []
433 sorter[key] = lst
434 lst.append(er)
436 if len(sorter) <= 0:
437 raise ValueError("source must not be empty")
439 if len(sorter) > 1:
440 keyz = sorted(sorter.keys())
441 for key in keyz:
442 consumer(create(sorter[key], f_lower_bound,
443 use_default_lower_bounds))
444 else:
445 consumer(create(next(iter(sorter.values())), f_lower_bound,
446 use_default_lower_bounds))