Coverage for moptipy / mo / archive / keep_farthest.py: 96%
122 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-24 08:49 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-24 08:49 +0000
1"""A multi-objective archive pruner based on distance."""
3from collections import Counter
4from math import inf
5from typing import Final, Iterable
7import numpy as np
8from numpy.linalg import norm
9from pycommons.types import check_int_range, type_error
11from moptipy.api.mo_archive import MOArchivePruner, MORecord
12from moptipy.api.mo_problem import MOProblem, check_mo_problem
13from moptipy.utils.nputils import DEFAULT_FLOAT, is_np_int
16class KeepFarthest(MOArchivePruner):
17 """
18 Keep the best records of selected dimensions and those farthest from them.
20 This archive pruner proceeds in two steps:
22 First, it keeps the records that minimize a user-provided set of
23 dimensions.
25 Second, it iteratively adds the records to the selection which have the
26 maximum minimal distance to the already selected ones in terms of the
27 square distance over the normalized dimensions.
29 This should provide some sort of diversity in the archive while allowing
30 the user to preserve optima in certain dimensions.
31 """
33 def __init__(self, problem: MOProblem,
34 keep_best_of_dimension: Iterable[int] | None = None):
35 """
36 Create the distance-based pruner.
38 :param problem: the multi-objective optimization problem
39 :param keep_best_of_dimension: the dimensions of which we will always
40 keep the best record, `None` (=default) for all dimensions
41 """
42 super().__init__()
44 check_mo_problem(problem)
45 dimension: Final[int] = check_int_range(
46 problem.f_dimension(), "dimension", 1, 1_000)
47 if keep_best_of_dimension is None:
48 keep_best_of_dimension = range(dimension)
49 if not isinstance(keep_best_of_dimension, Iterable):
50 raise type_error(keep_best_of_dimension,
51 "keep_best_of_dimension", Iterable)
53 dt: np.dtype = problem.f_dtype()
54 pinf: np.number
55 ninf: np.number
56 if is_np_int(dt):
57 ii = np.iinfo(dt)
58 pinf = dt.type(ii.max)
59 ninf = dt.type(ii.min)
60 else:
61 pinf = dt.type(inf)
62 ninf = dt.type(-inf)
64 #: the array for minima
65 self.__min: Final[np.ndarray] = np.empty(dimension, dt)
66 #: the array for maxima
67 self.__max: Final[np.ndarray] = np.empty(dimension, dt)
68 #: the initial number for minimum searching
69 self.__pinf: Final[np.number] = pinf
70 #: the initial number for maximum searching
71 self.__ninf: Final[np.number] = ninf
72 #: the list of items to preserve per dimension
73 self.__preserve: Final[list[set | None]] = [None] * dimension
74 for d in keep_best_of_dimension:
75 self.__preserve[d] = set()
76 #: the list of all items to preserve
77 self.__all_preserve: Final[list[set]] = []
78 for p in self.__preserve:
79 if p is not None:
80 self.__all_preserve.append(p)
81 if len(self.__all_preserve) <= 0:
82 raise ValueError(
83 f"there must be at least one dimension of which we keep the "
84 f"best, but keep_best_of_dimension={keep_best_of_dimension}!")
85 #: the counter for keeping the best
86 self.__counter: Final[Counter] = Counter()
87 #: the chosen elements to keep
88 self.__chosen: Final[list[int]] = []
89 #: the minimal distances
90 self.__min_dists: np.ndarray = np.empty(8, DEFAULT_FLOAT)
92 def prune(self, archive: list[MORecord], n_keep: int, size: int) -> None:
93 """
94 Preserve the best of certain dimensions and keep the rest diverse.
96 :param archive: the archive, i.e., a list of tuples of solutions and
97 their objective vectors
98 :param n_keep: the number of solutions to keep
99 :param size: the current size of the archive
100 """
101 if size <= n_keep:
102 return
104 # set up basic variables
105 mi: Final[np.ndarray] = self.__min
106 mi.fill(self.__pinf)
107 ma: Final[np.ndarray] = self.__max
108 ma.fill(self.__ninf)
109 dim: Final[int] = len(mi)
110 preserve: Final[list[set | None]] = self.__preserve
111 all_preserve: Final[list[set]] = self.__all_preserve
112 for p in all_preserve:
113 p.clear()
114 counter: Final[Counter] = self.__counter
115 counter.clear()
116 chosen: Final[list[int]] = self.__chosen
117 chosen.clear()
118 min_dists: np.ndarray = self.__min_dists
119 mdl: int = len(min_dists)
120 if mdl < size:
121 self.__min_dists = min_dists = np.full(size, inf, DEFAULT_FLOAT)
122 else:
123 min_dists.fill(inf)
125 # get the ranges of the dimension and remember the record with
126 # the minimal value per dimension
127 for idx, ind in enumerate(archive):
128 fs: np.ndarray = ind.fs
129 for i, f in enumerate(fs):
130 if f <= mi[i]:
131 q: set[int] | None = preserve[i]
132 if f < mi[i]:
133 mi[i] = f
134 if q is not None:
135 q.clear()
136 if q is not None:
137 q.add(idx)
138 ma[i] = max(f, ma[i])
140 # the number of selected records be 0 at the beginning
141 selected: int = 0
143 # In a first step, we want to keep the minimal elements of the
144 # selected objectives. Now there might be non-dominated records that
145 # are minimal in more than one objective. In this case, we prefer
146 # those that can satisfy several objectives. So we first check how
147 # many objectives are minimized by each minimal element. We then
148 # pick the one that satisfies most. We only pick one minimal element
149 # per objective.
151 # Count how often the records were selected for minimizing an objective
152 for p in all_preserve:
153 counter.update(p)
155 # Now pick the elements that minimize most objectives.
156 # We first sort keys for stability, then we sort based on the counter.
157 for maxc in sorted(sorted(counter.keys()), # noqa
158 key=lambda kk: -counter[kk]):
159 found: bool = False
160 for p in all_preserve:
161 if maxc in p: # If the objective can be minimized by the
162 p.clear() # element, we don't need to minimize it with
163 found = True # another element and can keep this one.
164 if found:
165 chosen.append(maxc)
166 chosen.sort() # Sort by index: We swap the selected records forward
168 # Now preserve the selected records by moving them to the front.
169 for choseni in chosen:
170 archive.insert(selected, archive.pop(choseni))
171 selected += 1
172 if selected >= n_keep:
173 return
175 # Now we have the elements that minimize the selected dimensions.
176 # Now we prepare the distances to ensure that we do not get any
177 # overflow when normalizing them.
178 for i in range(dim):
179 maa = ma[i]
180 if maa >= self.__pinf:
181 raise ValueError(f"maximum of dimension {i} is {maa}")
182 mii = mi[i]
183 if mii <= self.__ninf:
184 raise ValueError(f"minimum of dimension {i} is {mii}")
185 if maa < mii:
186 raise ValueError(
187 f"minimum of dimension {i} is {mii} and maximum is {maa}")
188 ma[i] = 1 if maa <= mii else maa - mii # ensure finite on div=0
190 # Now we fill up the archive with those records most different from
191 # the already included ones based on the square distance in the
192 # normalized dimensions. In each iteration, we compute the minimal
193 # normalized distance of each element to the already-selected ones.
194 # We then keep the one which has the largest minimal distance and add
195 # it to the selection.
196 dist_update_start: int = 0
197 while selected < n_keep: # until we have selected sufficiently many
198 max_dist: float = -inf # the maximum distance to be selected
199 max_dist_idx: int = selected # the index of that record
200 for rec_idx in range(selected, size): # iterate over unselected
201 min_dist_rec: float = min_dists[rec_idx] # min dist so far
202 rec: np.ndarray = archive[rec_idx].fs # objective vec
203 for cmp_idx in range(dist_update_start, selected):
204 cmp: np.ndarray = archive[cmp_idx].fs # objective vector
205 dst: float = float(norm((cmp - rec) / ma))
206 min_dist_rec = min(min_dist_rec, dst)
207 min_dists[rec_idx] = min_dist_rec # remember
208 if min_dist_rec > max_dist: # keep record with largest
209 max_dist = min_dist_rec # normalized distance
210 max_dist_idx = rec_idx # remember its index
211 # swap the record to the front of the archive
212 archive[selected], archive[max_dist_idx] = \
213 archive[max_dist_idx], archive[selected]
214 # preserve the distance of the element swapped back
215 min_dists[max_dist_idx] = min_dists[selected]
216 dist_update_start = selected
217 selected += 1
219 def __str__(self):
220 """
221 Get the name of this pruning strategy.
223 :returns: always `"keepFarthest"`
224 """
225 return "keepFarthest"