"""
Provides the CMA-ES Family Algorithms from the Library `cmaes`.
The Covariance Matrix Adaptation Evolutionary Strategy, CMA-ES for short, is a
very efficient optimization algorithm for small- and mid-scale and numerical/
continuous optimization problems.
Here, we wrap our `moptipy` API around the beautiful library `cmaes` by
Masashi Shibata and Masahiro Nomura at https://pypi.org/project/cmaes/. They
provide a CMA-ES implementation based on the ask-tell interface. In this
interface, you repeatedly query sample points in the search space from the
model and evaluate them. Then you feed back the points and their corresponding
objective values to the CMA-ES algorithm so that it can update its model. Then
the cycle is repeated.
1. Nikolaus Hansen and Andreas Ostermeier. A Completely Derandomized
Self-Adaptation in Evolution Strategies. *Evolutionary Computation.*
9(2):159-195. Summer 2001. https://dx.doi.org/10.1162/106365601750190398
2. Nikolaus Hansen. *The CMA Evolution Strategy: A Tutorial.*
arXiv:1604.00772, 2016. https://arxiv.org/abs/1604.00772
3. Raymond Ros and Nikolaus Hansen. A Simple Modification in CMA-ES Achieving
Linear Time and Space Complexity. In Günter Rudolph, Thomas Jansen, Nicola
Beume, Simon Lucas, and Carlo Poloni, eds., Proceedings of the 10th
International Conference on Parallel Problem Solving From Nature (PPSN X),
September 13-17, 2008, Dortmund, Germany, pages 296-305. Volume 5199 of
Lecture Notes in Computer Science. Berlin/Heidelberg, Germany: Springer.
http://dx.doi.org/10.1007/978-3-540-87700-4_30
https://hal.inria.fr/inria-00287367/document
4. Nikolaus Hansen. Benchmarking a BI-Population CMA-ES on the BBOB-2009
Function Testbed. In Proceedings of the 11th Annual Conference Companion
on Genetic and Evolutionary Computation Conference: Late Breaking Papers,
July 8-12, 2009, Montreal, Québec, Canada, pages 2389-2396.
New York, USA: ACM. http://dx.doi.org/10.1145/1570256.1570333
https://hal.inria.fr/inria-00382093/document
- https://pypi.org/project/cmaes/
- https://github.com/CyberAgent/cmaes
"""
from typing import Callable, Final
import numpy as np
from cmaes import CMA, SepCMA # type: ignore
from numpy.random import Generator
from pycommons.strings.string_conv import num_to_str
from pycommons.types import type_error
from moptipy.api.algorithm import Algorithm
from moptipy.api.process import Process
from moptipy.spaces.vectorspace import VectorSpace
from moptipy.utils.logger import CSV_SEPARATOR, KeyValueLogSection
def _run_cma(cma: SepCMA | CMA,
f: Callable[[np.ndarray], int | float],
should_terminate: Callable[[], bool],
solutions: list[tuple[np.ndarray, int | float]],
run_criterion: Callable[[], bool] = lambda: False) -> int:
"""
Run a CMA implementation from the `cmaes` library.
This is an internal core routine that translates the ask-tell interface
of the algorithm implementations in the `cmaes` library into a simple
loop.
:param cma: the algorithm instance
:param f: the objective function
:param should_terminate: the termination criterion
:param solutions: the internal list to store the solutions
:param run_criterion: the stopper for a run
:returns: the number of consumed FEs if the run was terminated by
`run_criterion`, `-1` otherwise
"""
fes: int = 0
pop_size: Final[int] = cma.population_size
# now we load a lot of fast call function pointers
ask: Final[Callable[[], np.ndarray]] = cma.ask
append: Final[Callable[[
tuple[np.ndarray, int | float]], None]] = solutions.append
tell: Final[Callable[
[list[tuple[np.ndarray, float]]], None]] = cma.tell
clear: Final[Callable[[], None]] = solutions.clear
while True: # the main loop
clear() # clear the ask/tell records
for _ in range(pop_size):
if should_terminate(): # budget over?
return -1 # exit
x: np.ndarray = ask() # sample a point from CMA-ES
value: int | float = f(x) # compute its objective value
append((x, value)) # store the point
fes = fes + 1
tell(solutions) # feed all results back to the CMA
if run_criterion():
return fes
[docs]
class CMAES(Algorithm):
"""
A wrapper for the `CMA` algorithm from `cmaes`.
1. Nikolaus Hansen and Andreas Ostermeier. A Completely Derandomized
Self-Adaptation in Evolution Strategies. *Evolutionary Computation.*
9(2):159-195. Summer 2001.
https://dx.doi.org/10.1162/106365601750190398
2. Nikolaus Hansen. *The CMA Evolution Strategy: A Tutorial.*
arXiv:1604.00772, 2016. https://arxiv.org/abs/1604.00772
"""
def __init__(self, space: VectorSpace) -> None:
"""
Create the CMAES algorithm.
:param space: the vector space
"""
super().__init__()
if not isinstance(space, VectorSpace):
raise type_error(space, "space", VectorSpace)
if space.dimension <= 1:
raise ValueError("CMA-ES only works on at least two dimensions.")
#: the vector space defining the dimensions and bounds
self.space: Final[VectorSpace] = space
[docs]
def solve(self, process: Process) -> None:
"""
Apply the bi-population CMA-ES to an optimization problem.
:param process: the black-box process object
"""
f: Final[Callable[[np.ndarray], int | float]] = \
self.space.clipped(process.evaluate) # the clipped objective
should_terminate: Final[Callable[[], bool]] = \
process.should_terminate # the termination criterion
lb: Final[np.ndarray] = self.space.lower_bound # the upper bound
ub: Final[np.ndarray] = self.space.upper_bound # the lower bound
mean: Final[np.ndarray] = 0.5 * (lb + ub) # use center as mean value
sigma: Final[float] = 0.2 * max(ub - lb) # use a large initial sigma
bounds: Final[np.ndarray] = \
np.stack((lb, ub)).transpose() # construct bounds
# we create and directly run the CMA-ES algorithm
_run_cma(CMA(mean=mean, sigma=sigma, bounds=bounds,
seed=process.get_random().integers(0, 4294967296)),
f, should_terminate, [])
[docs]
def log_parameters_to(self, logger: KeyValueLogSection) -> None:
"""
Log the parameters of the algorithm to a logger.
:param logger: the logger for the parameters
"""
super().log_parameters_to(logger) # log algorithm/operator
self.space.log_bounds(logger) # log bounds
def __str__(self):
"""
Get the name of this optimization algorithm.
:retval "cmaes_cmaes": always
"""
return "cmaes_cmaes"
[docs]
class SepCMAES(CMAES):
"""
The Separable CMA-ES based on Class `SepCMA` from Library `cmaes`.
This is a variant of the CMA-ES where the covariance matrix is
constrained to be diagonal. This means that there are fewer parameters to
learn, so the learning rate for the covariance matrix can be increased.
This algorithm is suitable if the problem is of larger scale, i.e., has
a high dimension, in which case the pure CMA-ES may become rather slow in
terms of its runtime consumption. Then, the loss of solution quality
resulting from the underlying assumption that the objective function is
separable is acceptable versus the gain in speed. By learning only the
diagonals of the covariance matrix, the implicit assumption is that there
are no mutual influences between the different decision variables. Of
course, if the optimization problem is already of that nature, i.e.,
separable, the algorithm will be faster than the normal CMA-ES at the same
solution quality.
1. Raymond Ros and Nikolaus Hansen. A Simple Modification in CMA-ES
Achieving Linear Time and Space Complexity. In Günter Rudolph,
Thomas Jansen, Nicola Beume, Simon Lucas, and Carlo Poloni, eds.,
Proceedings of the 10th International Conference on Parallel
Problem Solving From Nature (PPSN X), September 13-17, 2008,
Dortmund, Germany, pages 296-305. Volume 5199 of Lecture Notes in
Computer Science. Berlin/Heidelberg, Germany: Springer.
http://dx.doi.org/10.1007/978-3-540-87700-4_30
https://hal.inria.fr/inria-00287367/document
"""
[docs]
def solve(self, process: Process) -> None:
"""
Apply the separable CMA-ES version to an optimization problem.
:param process: the optimization problem to solve
"""
f: Final[Callable[[np.ndarray], int | float]] = \
self.space.clipped(process.evaluate) # the clipped objective
should_terminate: Final[Callable[[], bool]] = \
process.should_terminate # the termination criterion
lb: Final[np.ndarray] = self.space.lower_bound # the upper bound
ub: Final[np.ndarray] = self.space.upper_bound # the lower bound
mean: Final[np.ndarray] = 0.5 * (lb + ub) # use center as mean value
sigma: Final[float] = 0.2 * max(ub - lb) # use a large initial sigma
bounds: Final[np.ndarray] = \
np.stack((lb, ub)).transpose() # construct bounds
# we create and directly run the CMA-ES algorithm
_run_cma(SepCMA(mean=mean, sigma=sigma, bounds=bounds,
seed=process.get_random().integers(0, 4294967296)),
f, should_terminate, [])
def __str__(self):
"""
Get the name of this optimization algorithm.
:retval "sepCmaes_cmaes": always
"""
return "sepCmaes_cmaes"
[docs]
class BiPopCMAES(CMAES):
"""
The bi-population CMA-ES based on Class `CMA` from Library `cmaes`.
This algorithm combines two restart strategies for the normal CMA-ES under
its hood. One where the population size increases exponentially and one
where varying small population sizes are used.
We here implement the bi-population CMA-ES algorithm in exactly the same
way as the authors of the `cmaes` library do on
https://pypi.org/project/cmaes/.
1. Nikolaus Hansen. Benchmarking a BI-Population CMA-ES on the BBOB-2009
Function Testbed. In Proceedings of the 11th Annual Conference
Companion on Genetic and Evolutionary Computation Conference: Late
Breaking Papers, July 8-12, 2009, Montreal, Québec, Canada,
pages 2389-2396. New York, USA: ACM.
http://dx.doi.org/10.1145/1570256.1570333
https://hal.inria.fr/inria-00382093/document
"""
def __init__(self, space: VectorSpace,
log_restarts: bool = False) -> None:
"""
Create the CMAES algorithm.
:param space: the vector space
:param log_restarts: log the restart counters
"""
super().__init__(space)
if not isinstance(log_restarts, bool):
raise type_error(log_restarts, "log_restarts", bool)
#: should we log the FEs when the restarts happen or not?
self.log_restarts: Final[bool] = log_restarts
[docs]
def log_parameters_to(self, logger: KeyValueLogSection) -> None:
"""
Log the parameters of the algorithm to a logger.
:param logger: the logger for the parameters
"""
super().log_parameters_to(logger) # log algorithm/operator
logger.key_value("logRestarts", self.log_restarts)
[docs]
def solve(self, process: Process) -> None:
"""
Apply the external `cmaes` implementation to an optimization problem.
:param process: the black-box process object
"""
f: Final[Callable[[np.ndarray], int | float]] = \
self.space.clipped(process.evaluate) # the clipped objective
should_terminate: Final[Callable[[], bool]] = \
process.should_terminate # the termination criterion
# should we log the CMA-ES restart settings?
restarts: Final[list[tuple[int, int, int, int, bool]] | None] = \
[] if self.log_restarts and process.has_log() else None
lb: Final[np.ndarray] = self.space.lower_bound # the upper bound
ub: Final[np.ndarray] = self.space.upper_bound # the lower bound
mean: Final[np.ndarray] = 0.5 * (lb + ub) # use center as mean value
sigma: Final[float] = 0.2 * max(ub - lb) # use a large initial sigma
bounds: Final[np.ndarray] = \
np.stack((lb, ub)).transpose() # construct bounds
random: Generator = process.get_random()
# create the initial CMA-ES setup
seed: int = random.integers(0, 4294967296)
cma = CMA(mean=mean, sigma=sigma, bounds=bounds, seed=seed)
solutions: list[tuple[np.ndarray, int | float]] = []
large_pop_restarts: int = 0 # the restarts with big population
small_pop_fes: int = 0 # the FEs spent in the small population
large_pop_fes: int = 0 # the FEs spent in the large population
initial_pop_size: Final[int] = cma.population_size
is_small_pop: bool = True # are we in a small-population run?
# The first run is with the "normal" population size. This is
# the large population before the first doubling, but its FEs
# count for the small population.
while True: # the main loop
if restarts is not None:
restarts.append((process.get_consumed_fes(),
process.get_consumed_time_millis(),
cma.population_size, seed, is_small_pop))
fes = _run_cma(cma, f, should_terminate, solutions,
cma.should_stop)
if fes < 0: # this means that should_terminate became True
break # so we quit
if is_small_pop: # it was a small population so increment
small_pop_fes += fes # the small-population-FEs
else: # it was a large population, so increment the
large_pop_fes += fes # the large-population-FEs
# We try to spend the same number FEs in small as in the large
# population.
is_small_pop = small_pop_fes < large_pop_fes
if is_small_pop: # create the small population
pop_size_multiplier = 2 ** large_pop_restarts
pop_size = max(1, int(
initial_pop_size * pop_size_multiplier ** (
random.uniform() ** 2)))
else: # else: create the large population
large_pop_restarts = large_pop_restarts + 1
pop_size = initial_pop_size * (2 ** large_pop_restarts)
# Create the new CMA-ES instance.
seed = random.integers(0, 4294967296)
cma = CMA(mean=mean, sigma=sigma, bounds=bounds,
population_size=pop_size,
seed=seed)
if restarts is not None: # write the log section
log: Final[list[str]] = [
f"fes{CSV_SEPARATOR}timeMillis{CSV_SEPARATOR}popSize"
f"{CSV_SEPARATOR}seed{CSV_SEPARATOR}isSmall"]
for row in restarts:
log.append(CSV_SEPARATOR.join(map(
num_to_str, (x for x in row))))
del restarts
process.add_log_section("CMA_RESTARTS", "\n".join(log))
del log
def __str__(self):
"""
Get the name of this optimization algorithm.
:retval "biPopCmaes_cmaes": always
"""
return "biPopCmaes_cmaes"