Coverage for moptipy / algorithms / so / vector / cmaes_lib.py: 92%
115 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-24 08:49 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-24 08:49 +0000
1"""
2Provides the CMA-ES Family Algorithms from the Library `cmaes`.
4The Covariance Matrix Adaptation Evolutionary Strategy, CMA-ES for short, is a
5very efficient optimization algorithm for small- and mid-scale and numerical/
6continuous optimization problems.
8Here, we wrap our `moptipy` API around the beautiful library `cmaes` by
9Masashi Shibata and Masahiro Nomura at https://pypi.org/project/cmaes/. They
10provide a CMA-ES implementation based on the ask-tell interface. In this
11interface, you repeatedly query sample points in the search space from the
12model and evaluate them. Then you feed back the points and their corresponding
13objective values to the CMA-ES algorithm so that it can update its model. Then
14the cycle is repeated.
161. Nikolaus Hansen and Andreas Ostermeier. A Completely Derandomized
17 Self-Adaptation in Evolution Strategies. *Evolutionary Computation.*
18 9(2):159-195. Summer 2001. https://dx.doi.org/10.1162/106365601750190398
192. Nikolaus Hansen. *The CMA Evolution Strategy: A Tutorial.*
20 arXiv:1604.00772, 2016. https://arxiv.org/abs/1604.00772
213. Raymond Ros and Nikolaus Hansen. A Simple Modification in CMA-ES Achieving
22 Linear Time and Space Complexity. In Günter Rudolph, Thomas Jansen, Nicola
23 Beume, Simon Lucas, and Carlo Poloni, eds., Proceedings of the 10th
24 International Conference on Parallel Problem Solving From Nature (PPSN X),
25 September 13-17, 2008, Dortmund, Germany, pages 296-305. Volume 5199 of
26 Lecture Notes in Computer Science. Berlin/Heidelberg, Germany: Springer.
27 http://dx.doi.org/10.1007/978-3-540-87700-4_30
28 https://hal.inria.fr/inria-00287367/document
294. Nikolaus Hansen. Benchmarking a BI-Population CMA-ES on the BBOB-2009
30 Function Testbed. In Proceedings of the 11th Annual Conference Companion
31 on Genetic and Evolutionary Computation Conference: Late Breaking Papers,
32 July 8-12, 2009, Montreal, Québec, Canada, pages 2389-2396.
33 New York, USA: ACM. http://dx.doi.org/10.1145/1570256.1570333
34 https://hal.inria.fr/inria-00382093/document
36- https://pypi.org/project/cmaes/
37- https://github.com/CyberAgent/cmaes
38"""
40from typing import Callable, Final
42import numpy as np
43from cmaes import CMA, SepCMA # type: ignore
44from numpy.random import Generator
45from pycommons.strings.string_conv import bool_or_num_to_str
46from pycommons.types import type_error
48from moptipy.api.algorithm import Algorithm
49from moptipy.api.process import Process
50from moptipy.spaces.vectorspace import VectorSpace
51from moptipy.utils.logger import CSV_SEPARATOR, KeyValueLogSection
54def _run_cma(cma: SepCMA | CMA,
55 f: Callable[[np.ndarray], int | float],
56 should_terminate: Callable[[], bool],
57 solutions: list[tuple[np.ndarray, int | float]],
58 run_criterion: Callable[[], bool] = lambda: False) -> int:
59 """
60 Run a CMA implementation from the `cmaes` library.
62 This is an internal core routine that translates the ask-tell interface
63 of the algorithm implementations in the `cmaes` library into a simple
64 loop.
66 :param cma: the algorithm instance
67 :param f: the objective function
68 :param should_terminate: the termination criterion
69 :param solutions: the internal list to store the solutions
70 :param run_criterion: the stopper for a run
71 :returns: the number of consumed FEs if the run was terminated by
72 `run_criterion`, `-1` otherwise
73 """
74 fes: int = 0
75 pop_size: Final[int] = cma.population_size
77 # now we load a lot of fast call function pointers
78 ask: Final[Callable[[], np.ndarray]] = cma.ask
79 append: Final[Callable[[
80 tuple[np.ndarray, int | float]], None]] = solutions.append
81 tell: Final[Callable[
82 [list[tuple[np.ndarray, float]]], None]] = cma.tell
83 clear: Final[Callable[[], None]] = solutions.clear
85 while True: # the main loop
86 clear() # clear the ask/tell records
87 for _ in range(pop_size):
88 if should_terminate(): # budget over?
89 return -1 # exit
90 x: np.ndarray = ask() # sample a point from CMA-ES
91 value: int | float = f(x) # compute its objective value
92 append((x, value)) # store the point
93 fes += 1
94 tell(solutions) # feed all results back to the CMA
95 if run_criterion():
96 return fes
99class CMAES(Algorithm):
100 """
101 A wrapper for the `CMA` algorithm from `cmaes`.
103 1. Nikolaus Hansen and Andreas Ostermeier. A Completely Derandomized
104 Self-Adaptation in Evolution Strategies. *Evolutionary Computation.*
105 9(2):159-195. Summer 2001.
106 https://dx.doi.org/10.1162/106365601750190398
107 2. Nikolaus Hansen. *The CMA Evolution Strategy: A Tutorial.*
108 arXiv:1604.00772, 2016. https://arxiv.org/abs/1604.00772
109 """
111 def __init__(self, space: VectorSpace) -> None:
112 """
113 Create the CMAES algorithm.
115 :param space: the vector space
116 """
117 super().__init__()
118 if not isinstance(space, VectorSpace):
119 raise type_error(space, "space", VectorSpace)
120 if space.dimension <= 1:
121 raise ValueError("CMA-ES only works on at least two dimensions.")
122 #: the vector space defining the dimensions and bounds
123 self.space: Final[VectorSpace] = space
125 def solve(self, process: Process) -> None:
126 """
127 Apply the bi-population CMA-ES to an optimization problem.
129 :param process: the black-box process object
130 """
131 f: Final[Callable[[np.ndarray], int | float]] = \
132 self.space.clipped(process.evaluate) # the clipped objective
133 should_terminate: Final[Callable[[], bool]] = \
134 process.should_terminate # the termination criterion
136 lb: Final[np.ndarray] = self.space.lower_bound # the upper bound
137 ub: Final[np.ndarray] = self.space.upper_bound # the lower bound
138 mean: Final[np.ndarray] = 0.5 * (lb + ub) # use center as mean value
139 sigma: Final[float] = 0.2 * max(ub - lb) # use a large initial sigma
140 bounds: Final[np.ndarray] = \
141 np.stack((lb, ub)).transpose() # construct bounds
143 # we create and directly run the CMA-ES algorithm
144 _run_cma(CMA(mean=mean, sigma=sigma, bounds=bounds,
145 seed=int(process.get_random().integers(0, 4294967296))),
146 f, should_terminate, [])
148 def log_parameters_to(self, logger: KeyValueLogSection) -> None:
149 """
150 Log the parameters of the algorithm to a logger.
152 :param logger: the logger for the parameters
153 """
154 super().log_parameters_to(logger) # log algorithm/operator
155 self.space.log_bounds(logger) # log bounds
157 def __str__(self):
158 """
159 Get the name of this optimization algorithm.
161 :retval "cmaes_cmaes": always
162 """
163 return "cmaes_cmaes"
166class SepCMAES(CMAES):
167 """
168 The Separable CMA-ES based on Class `SepCMA` from Library `cmaes`.
170 This is a variant of the CMA-ES where the covariance matrix is
171 constrained to be diagonal. This means that there are fewer parameters to
172 learn, so the learning rate for the covariance matrix can be increased.
173 This algorithm is suitable if the problem is of larger scale, i.e., has
174 a high dimension, in which case the pure CMA-ES may become rather slow in
175 terms of its runtime consumption. Then, the loss of solution quality
176 resulting from the underlying assumption that the objective function is
177 separable is acceptable versus the gain in speed. By learning only the
178 diagonals of the covariance matrix, the implicit assumption is that there
179 are no mutual influences between the different decision variables. Of
180 course, if the optimization problem is already of that nature, i.e.,
181 separable, the algorithm will be faster than the normal CMA-ES at the same
182 solution quality.
184 1. Raymond Ros and Nikolaus Hansen. A Simple Modification in CMA-ES
185 Achieving Linear Time and Space Complexity. In Günter Rudolph,
186 Thomas Jansen, Nicola Beume, Simon Lucas, and Carlo Poloni, eds.,
187 Proceedings of the 10th International Conference on Parallel
188 Problem Solving From Nature (PPSN X), September 13-17, 2008,
189 Dortmund, Germany, pages 296-305. Volume 5199 of Lecture Notes in
190 Computer Science. Berlin/Heidelberg, Germany: Springer.
191 http://dx.doi.org/10.1007/978-3-540-87700-4_30
192 https://hal.inria.fr/inria-00287367/document
193 """
195 def solve(self, process: Process) -> None:
196 """
197 Apply the separable CMA-ES version to an optimization problem.
199 :param process: the optimization problem to solve
200 """
201 f: Final[Callable[[np.ndarray], int | float]] = \
202 self.space.clipped(process.evaluate) # the clipped objective
203 should_terminate: Final[Callable[[], bool]] = \
204 process.should_terminate # the termination criterion
206 lb: Final[np.ndarray] = self.space.lower_bound # the upper bound
207 ub: Final[np.ndarray] = self.space.upper_bound # the lower bound
208 mean: Final[np.ndarray] = 0.5 * (lb + ub) # use center as mean value
209 sigma: Final[float] = 0.2 * max(ub - lb) # use a large initial sigma
210 bounds: Final[np.ndarray] = \
211 np.stack((lb, ub)).transpose() # construct bounds
213 # we create and directly run the CMA-ES algorithm
214 _run_cma(SepCMA(mean=mean, sigma=sigma, bounds=bounds,
215 seed=int(process.get_random().integers(
216 0, 4294967296))),
217 f, should_terminate, [])
219 def __str__(self):
220 """
221 Get the name of this optimization algorithm.
223 :retval "sepCmaes_cmaes": always
224 """
225 return "sepCmaes_cmaes"
228class BiPopCMAES(CMAES):
229 """
230 The bi-population CMA-ES based on Class `CMA` from Library `cmaes`.
232 This algorithm combines two restart strategies for the normal CMA-ES under
233 its hood. One where the population size increases exponentially and one
234 where varying small population sizes are used.
236 We here implement the bi-population CMA-ES algorithm in exactly the same
237 way as the authors of the `cmaes` library do on
238 https://pypi.org/project/cmaes/.
240 1. Nikolaus Hansen. Benchmarking a BI-Population CMA-ES on the BBOB-2009
241 Function Testbed. In Proceedings of the 11th Annual Conference
242 Companion on Genetic and Evolutionary Computation Conference: Late
243 Breaking Papers, July 8-12, 2009, Montreal, Québec, Canada,
244 pages 2389-2396. New York, USA: ACM.
245 http://dx.doi.org/10.1145/1570256.1570333
246 https://hal.inria.fr/inria-00382093/document
247 """
249 def __init__(self, space: VectorSpace,
250 log_restarts: bool = False) -> None:
251 """
252 Create the CMAES algorithm.
254 :param space: the vector space
255 :param log_restarts: log the restart counters
256 """
257 super().__init__(space)
258 if not isinstance(log_restarts, bool):
259 raise type_error(log_restarts, "log_restarts", bool)
260 #: should we log the FEs when the restarts happen or not?
261 self.log_restarts: Final[bool] = log_restarts
263 def log_parameters_to(self, logger: KeyValueLogSection) -> None:
264 """
265 Log the parameters of the algorithm to a logger.
267 :param logger: the logger for the parameters
268 """
269 super().log_parameters_to(logger) # log algorithm/operator
270 logger.key_value("logRestarts", self.log_restarts)
272 def solve(self, process: Process) -> None:
273 """
274 Apply the external `cmaes` implementation to an optimization problem.
276 :param process: the black-box process object
277 """
278 f: Final[Callable[[np.ndarray], int | float]] = \
279 self.space.clipped(process.evaluate) # the clipped objective
280 should_terminate: Final[Callable[[], bool]] = \
281 process.should_terminate # the termination criterion
282 # should we log the CMA-ES restart settings?
283 restarts: Final[list[tuple[int, int, int, int, bool]] | None] = \
284 [] if self.log_restarts and process.has_log() else None
286 lb: Final[np.ndarray] = self.space.lower_bound # the upper bound
287 ub: Final[np.ndarray] = self.space.upper_bound # the lower bound
288 mean: Final[np.ndarray] = 0.5 * (lb + ub) # use center as mean value
289 sigma: Final[float] = 0.2 * max(ub - lb) # use a large initial sigma
290 bounds: Final[np.ndarray] = \
291 np.stack((lb, ub)).transpose() # construct bounds
293 random: Generator = process.get_random()
295 # create the initial CMA-ES setup
296 seed: int = int(random.integers(0, 4294967296))
297 cma = CMA(mean=mean, sigma=sigma, bounds=bounds, seed=seed)
299 solutions: list[tuple[np.ndarray, int | float]] = []
300 large_pop_restarts: int = 0 # the restarts with big population
301 small_pop_fes: int = 0 # the FEs spent in the small population
302 large_pop_fes: int = 0 # the FEs spent in the large population
303 initial_pop_size: Final[int] = cma.population_size
304 is_small_pop: bool = True # are we in a small-population run?
306 # The first run is with the "normal" population size. This is
307 # the large population before the first doubling, but its FEs
308 # count for the small population.
309 while True: # the main loop
310 if restarts is not None:
311 restarts.append((process.get_consumed_fes(),
312 process.get_consumed_time_millis(),
313 int(cma.population_size), seed,
314 is_small_pop))
315 fes = _run_cma(cma, f, should_terminate, solutions,
316 cma.should_stop)
317 if fes < 0: # this means that should_terminate became True
318 break # so we quit
319 if is_small_pop: # it was a small population so increment
320 small_pop_fes += fes # the small-population-FEs
321 else: # it was a large population, so increment the
322 large_pop_fes += fes # the large-population-FEs
324 # We try to spend the same number FEs in small as in the large
325 # population.
326 is_small_pop = small_pop_fes < large_pop_fes
328 if is_small_pop: # create the small population
329 pop_size_multiplier = 2 ** large_pop_restarts
330 pop_size = max(1, int(
331 initial_pop_size * pop_size_multiplier ** (
332 random.uniform() ** 2)))
333 else: # else: create the large population
334 large_pop_restarts += 1
335 pop_size = initial_pop_size * (2 ** large_pop_restarts)
337 # Create the new CMA-ES instance.
338 seed = int(random.integers(0, 4294967296))
339 cma = CMA(mean=mean, sigma=sigma, bounds=bounds,
340 population_size=pop_size,
341 seed=seed)
343 if restarts is not None: # write the log section
344 log: Final[list[str]] = [
345 f"fes{CSV_SEPARATOR}timeMillis{CSV_SEPARATOR}popSize"
346 f"{CSV_SEPARATOR}seed{CSV_SEPARATOR}isSmall"]
347 log.extend(CSV_SEPARATOR.join(map(
348 bool_or_num_to_str, row)) for row in restarts)
349 del restarts
350 process.add_log_section("CMA_RESTARTS", "\n".join(log))
351 del log
353 def __str__(self):
354 """
355 Get the name of this optimization algorithm.
357 :retval "biPopCmaes_cmaes": always
358 """
359 return "biPopCmaes_cmaes"