Coverage for moptipy / mo / archive / keep_farthest.py: 96%

122 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-24 08:49 +0000

1"""A multi-objective archive pruner based on distance.""" 

2 

3from collections import Counter 

4from math import inf 

5from typing import Final, Iterable 

6 

7import numpy as np 

8from numpy.linalg import norm 

9from pycommons.types import check_int_range, type_error 

10 

11from moptipy.api.mo_archive import MOArchivePruner, MORecord 

12from moptipy.api.mo_problem import MOProblem, check_mo_problem 

13from moptipy.utils.nputils import DEFAULT_FLOAT, is_np_int 

14 

15 

16class KeepFarthest(MOArchivePruner): 

17 """ 

18 Keep the best records of selected dimensions and those farthest from them. 

19 

20 This archive pruner proceeds in two steps: 

21 

22 First, it keeps the records that minimize a user-provided set of 

23 dimensions. 

24 

25 Second, it iteratively adds the records to the selection which have the 

26 maximum minimal distance to the already selected ones in terms of the 

27 square distance over the normalized dimensions. 

28 

29 This should provide some sort of diversity in the archive while allowing 

30 the user to preserve optima in certain dimensions. 

31 """ 

32 

33 def __init__(self, problem: MOProblem, 

34 keep_best_of_dimension: Iterable[int] | None = None): 

35 """ 

36 Create the distance-based pruner. 

37 

38 :param problem: the multi-objective optimization problem 

39 :param keep_best_of_dimension: the dimensions of which we will always 

40 keep the best record, `None` (=default) for all dimensions 

41 """ 

42 super().__init__() 

43 

44 check_mo_problem(problem) 

45 dimension: Final[int] = check_int_range( 

46 problem.f_dimension(), "dimension", 1, 1_000) 

47 if keep_best_of_dimension is None: 

48 keep_best_of_dimension = range(dimension) 

49 if not isinstance(keep_best_of_dimension, Iterable): 

50 raise type_error(keep_best_of_dimension, 

51 "keep_best_of_dimension", Iterable) 

52 

53 dt: np.dtype = problem.f_dtype() 

54 pinf: np.number 

55 ninf: np.number 

56 if is_np_int(dt): 

57 ii = np.iinfo(dt) 

58 pinf = dt.type(ii.max) 

59 ninf = dt.type(ii.min) 

60 else: 

61 pinf = dt.type(inf) 

62 ninf = dt.type(-inf) 

63 

64 #: the array for minima 

65 self.__min: Final[np.ndarray] = np.empty(dimension, dt) 

66 #: the array for maxima 

67 self.__max: Final[np.ndarray] = np.empty(dimension, dt) 

68 #: the initial number for minimum searching 

69 self.__pinf: Final[np.number] = pinf 

70 #: the initial number for maximum searching 

71 self.__ninf: Final[np.number] = ninf 

72 #: the list of items to preserve per dimension 

73 self.__preserve: Final[list[set | None]] = [None] * dimension 

74 for d in keep_best_of_dimension: 

75 self.__preserve[d] = set() 

76 #: the list of all items to preserve 

77 self.__all_preserve: Final[list[set]] = [] 

78 for p in self.__preserve: 

79 if p is not None: 

80 self.__all_preserve.append(p) 

81 if len(self.__all_preserve) <= 0: 

82 raise ValueError( 

83 f"there must be at least one dimension of which we keep the " 

84 f"best, but keep_best_of_dimension={keep_best_of_dimension}!") 

85 #: the counter for keeping the best 

86 self.__counter: Final[Counter] = Counter() 

87 #: the chosen elements to keep 

88 self.__chosen: Final[list[int]] = [] 

89 #: the minimal distances 

90 self.__min_dists: np.ndarray = np.empty(8, DEFAULT_FLOAT) 

91 

92 def prune(self, archive: list[MORecord], n_keep: int, size: int) -> None: 

93 """ 

94 Preserve the best of certain dimensions and keep the rest diverse. 

95 

96 :param archive: the archive, i.e., a list of tuples of solutions and 

97 their objective vectors 

98 :param n_keep: the number of solutions to keep 

99 :param size: the current size of the archive 

100 """ 

101 if size <= n_keep: 

102 return 

103 

104 # set up basic variables 

105 mi: Final[np.ndarray] = self.__min 

106 mi.fill(self.__pinf) 

107 ma: Final[np.ndarray] = self.__max 

108 ma.fill(self.__ninf) 

109 dim: Final[int] = len(mi) 

110 preserve: Final[list[set | None]] = self.__preserve 

111 all_preserve: Final[list[set]] = self.__all_preserve 

112 for p in all_preserve: 

113 p.clear() 

114 counter: Final[Counter] = self.__counter 

115 counter.clear() 

116 chosen: Final[list[int]] = self.__chosen 

117 chosen.clear() 

118 min_dists: np.ndarray = self.__min_dists 

119 mdl: int = len(min_dists) 

120 if mdl < size: 

121 self.__min_dists = min_dists = np.full(size, inf, DEFAULT_FLOAT) 

122 else: 

123 min_dists.fill(inf) 

124 

125 # get the ranges of the dimension and remember the record with 

126 # the minimal value per dimension 

127 for idx, ind in enumerate(archive): 

128 fs: np.ndarray = ind.fs 

129 for i, f in enumerate(fs): 

130 if f <= mi[i]: 

131 q: set[int] | None = preserve[i] 

132 if f < mi[i]: 

133 mi[i] = f 

134 if q is not None: 

135 q.clear() 

136 if q is not None: 

137 q.add(idx) 

138 ma[i] = max(f, ma[i]) 

139 

140 # the number of selected records be 0 at the beginning 

141 selected: int = 0 

142 

143 # In a first step, we want to keep the minimal elements of the 

144 # selected objectives. Now there might be non-dominated records that 

145 # are minimal in more than one objective. In this case, we prefer 

146 # those that can satisfy several objectives. So we first check how 

147 # many objectives are minimized by each minimal element. We then 

148 # pick the one that satisfies most. We only pick one minimal element 

149 # per objective. 

150 

151 # Count how often the records were selected for minimizing an objective 

152 for p in all_preserve: 

153 counter.update(p) 

154 

155 # Now pick the elements that minimize most objectives. 

156 # We first sort keys for stability, then we sort based on the counter. 

157 for maxc in sorted(sorted(counter.keys()), # noqa 

158 key=lambda kk: -counter[kk]): 

159 found: bool = False 

160 for p in all_preserve: 

161 if maxc in p: # If the objective can be minimized by the 

162 p.clear() # element, we don't need to minimize it with 

163 found = True # another element and can keep this one. 

164 if found: 

165 chosen.append(maxc) 

166 chosen.sort() # Sort by index: We swap the selected records forward 

167 

168 # Now preserve the selected records by moving them to the front. 

169 for choseni in chosen: 

170 archive.insert(selected, archive.pop(choseni)) 

171 selected += 1 

172 if selected >= n_keep: 

173 return 

174 

175 # Now we have the elements that minimize the selected dimensions. 

176 # Now we prepare the distances to ensure that we do not get any 

177 # overflow when normalizing them. 

178 for i in range(dim): 

179 maa = ma[i] 

180 if maa >= self.__pinf: 

181 raise ValueError(f"maximum of dimension {i} is {maa}") 

182 mii = mi[i] 

183 if mii <= self.__ninf: 

184 raise ValueError(f"minimum of dimension {i} is {mii}") 

185 if maa < mii: 

186 raise ValueError( 

187 f"minimum of dimension {i} is {mii} and maximum is {maa}") 

188 ma[i] = 1 if maa <= mii else maa - mii # ensure finite on div=0 

189 

190 # Now we fill up the archive with those records most different from 

191 # the already included ones based on the square distance in the 

192 # normalized dimensions. In each iteration, we compute the minimal 

193 # normalized distance of each element to the already-selected ones. 

194 # We then keep the one which has the largest minimal distance and add 

195 # it to the selection. 

196 dist_update_start: int = 0 

197 while selected < n_keep: # until we have selected sufficiently many 

198 max_dist: float = -inf # the maximum distance to be selected 

199 max_dist_idx: int = selected # the index of that record 

200 for rec_idx in range(selected, size): # iterate over unselected 

201 min_dist_rec: float = min_dists[rec_idx] # min dist so far 

202 rec: np.ndarray = archive[rec_idx].fs # objective vec 

203 for cmp_idx in range(dist_update_start, selected): 

204 cmp: np.ndarray = archive[cmp_idx].fs # objective vector 

205 dst: float = float(norm((cmp - rec) / ma)) 

206 min_dist_rec = min(min_dist_rec, dst) 

207 min_dists[rec_idx] = min_dist_rec # remember 

208 if min_dist_rec > max_dist: # keep record with largest 

209 max_dist = min_dist_rec # normalized distance 

210 max_dist_idx = rec_idx # remember its index 

211 # swap the record to the front of the archive 

212 archive[selected], archive[max_dist_idx] = \ 

213 archive[max_dist_idx], archive[selected] 

214 # preserve the distance of the element swapped back 

215 min_dists[max_dist_idx] = min_dists[selected] 

216 dist_update_start = selected 

217 selected += 1 

218 

219 def __str__(self): 

220 """ 

221 Get the name of this pruning strategy. 

222 

223 :returns: always `"keepFarthest"` 

224 """ 

225 return "keepFarthest"