Source code for moptipy.mo.archive.keep_farthest
"""A multi-objective archive pruner based on distance."""
from collections import Counter
from math import inf
from typing import Final, Iterable
import numpy as np
from numpy.linalg import norm
from pycommons.types import check_int_range, type_error
from moptipy.api.mo_archive import MOArchivePruner, MORecord
from moptipy.api.mo_problem import MOProblem, check_mo_problem
from moptipy.utils.nputils import DEFAULT_FLOAT, is_np_int
[docs]
class KeepFarthest(MOArchivePruner):
"""
Keep the best records of selected dimensions and those farthest from them.
This archive pruner proceeds in two steps:
First, it keeps the records that minimize a user-provided set of
dimensions.
Second, it iteratively adds the records to the selection which have the
maximum minimal distance to the already selected ones in terms of the
square distance over the normalized dimensions.
This should provide some sort of diversity in the archive while allowing
the user to preserve optima in certain dimensions.
"""
def __init__(self, problem: MOProblem,
keep_best_of_dimension: Iterable[int] | None = None):
"""
Create the distance-based pruner.
:param problem: the multi-objective optimization problem
:param keep_best_of_dimension: the dimensions of which we will always
keep the best record, `None` (=default) for all dimensions
"""
super().__init__()
check_mo_problem(problem)
dimension: Final[int] = check_int_range(
problem.f_dimension(), "dimension", 1, 1_000)
if keep_best_of_dimension is None:
keep_best_of_dimension = range(dimension)
if not isinstance(keep_best_of_dimension, Iterable):
raise type_error(keep_best_of_dimension,
"keep_best_of_dimension", Iterable)
dt: np.dtype = problem.f_dtype()
pinf: np.number
ninf: np.number
if is_np_int(dt):
ii = np.iinfo(dt)
pinf = dt.type(ii.max)
ninf = dt.type(ii.min)
else:
pinf = dt.type(inf)
ninf = dt.type(-inf)
#: the array for minima
self.__min: Final[np.ndarray] = np.empty(dimension, dt)
#: the array for maxima
self.__max: Final[np.ndarray] = np.empty(dimension, dt)
#: the initial number for minimum searching
self.__pinf: Final[np.number] = pinf
#: the initial number for maximum searching
self.__ninf: Final[np.number] = ninf
#: the list of items to preserve per dimension
self.__preserve: Final[list[set | None]] = [None] * dimension
for d in keep_best_of_dimension:
self.__preserve[d] = set()
#: the list of all items to preserve
self.__all_preserve: Final[list[set]] = []
for p in self.__preserve:
if p is not None:
self.__all_preserve.append(p)
if len(self.__all_preserve) <= 0:
raise ValueError(
f"there must be at least one dimension of which we keep the "
f"best, but keep_best_of_dimension={keep_best_of_dimension}!")
#: the counter for keeping the best
self.__counter: Final[Counter] = Counter()
#: the chosen elements to keep
self.__chosen: Final[list[int]] = []
#: the minimal distances
self.__min_dists: np.ndarray = np.empty(8, DEFAULT_FLOAT)
[docs]
def prune(self, archive: list[MORecord], n_keep: int, size: int) -> None:
"""
Preserve the best of certain dimensions and keep the rest diverse.
:param archive: the archive, i.e., a list of tuples of solutions and
their objective vectors
:param n_keep: the number of solutions to keep
:param size: the current size of the archive
"""
if size <= n_keep:
return
# set up basic variables
mi: Final[np.ndarray] = self.__min
mi.fill(self.__pinf)
ma: Final[np.ndarray] = self.__max
ma.fill(self.__ninf)
dim: Final[int] = len(mi)
preserve: Final[list[set | None]] = self.__preserve
all_preserve: Final[list[set]] = self.__all_preserve
for p in all_preserve:
p.clear()
counter: Final[Counter] = self.__counter
counter.clear()
chosen: Final[list[int]] = self.__chosen
chosen.clear()
min_dists: np.ndarray = self.__min_dists
mdl: int = len(min_dists)
if mdl < size:
self.__min_dists = min_dists = np.full(size, inf, DEFAULT_FLOAT)
else:
min_dists.fill(inf)
# get the ranges of the dimension and remember the record with
# the minimal value per dimension
for idx, ind in enumerate(archive):
fs: np.ndarray = ind.fs
for i, f in enumerate(fs):
if f <= mi[i]:
q: set[int] | None = preserve[i]
if f < mi[i]:
mi[i] = f
if q is not None:
q.clear()
if q is not None:
q.add(idx)
if f > ma[i]:
ma[i] = f
# the number of selected records be 0 at the beginning
selected: int = 0
# In a first step, we want to keep the minimal elements of the
# selected objectives. Now there might be non-dominated records that
# are minimal in more than one objective. In this case, we prefer
# those that can satisfy several objectives. So we first check how
# many objectives are minimized by each minimal element. We then
# pick the one that satisfies most. We only pick one minimal element
# per objective.
# Count how often the records were selected for minimizing an objective
for p in all_preserve:
counter.update(p)
# Now pick the elements that minimize most objectives.
# We first sort keys for stability, then we sort based on the counter.
for maxc in sorted(sorted(counter.keys()), # noqa
key=lambda kk: -counter[kk]):
found: bool = False
for p in all_preserve:
if maxc in p: # If the objective can be minimized by the
p.clear() # element, we don't need to minimize it with
found = True # another element and can keep this one.
if found:
chosen.append(maxc)
chosen.sort() # Sort by index: We swap the selected records forward
# Now preserve the selected records by moving them to the front.
for choseni in chosen:
archive.insert(selected, archive.pop(choseni))
selected = selected + 1
if selected >= n_keep:
return
# Now we have the elements that minimize the selected dimensions.
# Now we prepare the distances to ensure that we do not get any
# overflow when normalizing them.
for i in range(dim):
maa = ma[i]
if maa >= self.__pinf:
raise ValueError(f"maximum of dimension {i} is {maa}")
mii = mi[i]
if mii <= self.__ninf:
raise ValueError(f"minimum of dimension {i} is {mii}")
if maa < mii:
raise ValueError(
f"minimum of dimension {i} is {mii} and maximum is {maa}")
ma[i] = 1 if maa <= mii else maa - mii # ensure finite on div=0
# Now we fill up the archive with those records most different from
# the already included ones based on the square distance in the
# normalized dimensions. In each iteration, we compute the minimal
# normalized distance of each element to the already-selected ones.
# We then keep the one which has the largest minimal distance and add
# it to the selection.
dist_update_start: int = 0
while selected < n_keep: # until we have selected sufficiently many
max_dist: float = -inf # the maximum distance to be selected
max_dist_idx: int = selected # the index of that record
for rec_idx in range(selected, size): # iterate over unselected
min_dist_rec: float = min_dists[rec_idx] # min dist so far
rec: np.ndarray = archive[rec_idx].fs # objective vec
for cmp_idx in range(dist_update_start, selected):
cmp: np.ndarray = archive[cmp_idx].fs # objective vector
dst: float = float(norm((cmp - rec) / ma))
min_dist_rec = min(min_dist_rec, dst)
min_dists[rec_idx] = min_dist_rec # remember
if min_dist_rec > max_dist: # keep record with largest
max_dist = min_dist_rec # normalized distance
max_dist_idx = rec_idx # remember its index
# swap the record to the front of the archive
archive[selected], archive[max_dist_idx] = \
archive[max_dist_idx], archive[selected]
# preserve the distance of the element swapped back
min_dists[max_dist_idx] = min_dists[selected]
dist_update_start = selected
selected = selected + 1
def __str__(self):
"""
Get the name of this pruning strategy.
:returns: always `"keepFarthest"`
"""
return "keepFarthest"