Coverage for moptipy / algorithms / mo / nsga2.py: 97%

147 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-24 08:49 +0000

1""" 

2An implementation of NSGA-II, a famous multi-objective optimization method. 

3 

4Here we implement the famous NSGA-II algorithm by Deb, Agrawal, Pratap, and 

5Meyarivan. 

6 

7We apply the following modifications to the computation of the crowding 

8distance: 

9First, we normalize each dimension based on its range in the population. This 

10may make sense because some objective functions may have huge ranges while 

11others have very small ranges and would then basically be ignored in the 

12crowding distance. 

13Also, we do not assign infinite crowding distances to border elements of 

14collapsed dimensions. In other words, if all elements have the same value of 

15one objective function, none of them would receive infinite crowding distance 

16for this objective. This also makes sense because in this case, sorting would 

17be rather random. 

18We also start by generating the 2*:attr:`NSGA2.pop_size` random solutions. 

19When selecting parents, we apply tournament selection with replacement. 

20 

211. Kalyanmoy Deb, Samir Agrawal, Amrit Pratap, and T. Meyarivan. A Fast 

22 Elitist Non-Dominated Sorting Genetic Algorithm for Multi-Objective 

23 Optimization: NSGA-II. In Proceedings of the International Conference 

24 on Parallel Problem Solving from Nature (PPSN VI), September 18-20, 2000, 

25 Paris, France, pages 849-858. Lecture Notes in Computer Science, 

26 volume 1917. ISBN 978-3-540-41056-0. Berlin, Heidelberg: Springer. 

27 https://doi.org/10.1007/3-540-45356-3_83. 

28 Also: KanGAL Report Number 200001. 

29 http://repository.ias.ac.in/83498/1/2-a.pdf. 

30 

31""" 

32from math import inf 

33from typing import Any, Callable, Final, cast 

34 

35import numpy as np 

36from numpy.random import Generator 

37from pycommons.types import check_int_range, type_error 

38 

39from moptipy.api.algorithm import Algorithm2 

40from moptipy.api.mo_algorithm import MOAlgorithm 

41from moptipy.api.mo_archive import MORecord 

42from moptipy.api.mo_process import MOProcess 

43from moptipy.api.operators import Op0, Op1, Op2 

44from moptipy.utils.logger import KeyValueLogSection 

45from moptipy.utils.strings import num_to_str_for_name 

46 

47 

48class _NSGA2Record(MORecord): 

49 """The NSGA-II specific internal record structure.""" 

50 

51 def __init__(self, x: Any, fs: np.ndarray) -> None: 

52 """ 

53 Create a multi-objective record. 

54 

55 :param x: the point in the search space 

56 :param fs: the vector of objective values 

57 """ 

58 super().__init__(x, fs) 

59 #: the list of dominated solutions 

60 self.dominates: Final[list[_NSGA2Record]] = [] 

61 #: the number of solutions this one is dominated by 

62 self.n_dominated_by: int = 0 

63 #: the pareto front index 

64 self.front: int = -1 

65 #: the crowding distance 

66 self.crowding_distance: float = 0.0 

67 

68 def __lt__(self, other): 

69 """ 

70 Compare according to domination front and crowding distance. 

71 

72 :param other: the other element to compare with 

73 :returns: `True` if an only if this solution has either a smaller 

74 Pareto front index (:attr:`front`) or the same front index 

75 and a larger crowding distance (:attr:`crowding_distance`) 

76 """ 

77 fs: Final[int] = self.front 

78 fo: Final[int] = other.front 

79 return (fs < fo) or ((fs == fo) and ( 

80 self.crowding_distance > other.crowding_distance)) 

81 

82 

83def _non_dominated_sorting( 

84 pop: list[_NSGA2Record], 

85 needed_at_least: int, 

86 domination: Callable[[np.ndarray, np.ndarray], int]) -> int: 

87 """ 

88 Perform the fast non-dominated sorting. 

89 

90 :param pop: the population 

91 :param needed_at_least: the number of elements we actually need 

92 :param domination: the domination relationship 

93 :returns: the end of the last relevant frontier, with 

94 `needed_at_least <= end` 

95 """ 

96 lp: Final[int] = len(pop) 

97 

98 # clear the domination record 

99 for rec in pop: 

100 rec.n_dominated_by = 0 

101 

102 # compute the domination 

103 for i in range(lp - 1): 

104 a = pop[i] 

105 # we only check each pair of solutions once 

106 for j in range(i + 1, lp): 

107 b = pop[j] 

108 d = domination(a.fs, b.fs) 

109 if d == -1: # if a dominates b 

110 a.dominates.append(b) # remember that it does 

111 b.n_dominated_by += 1 # increment b's domination count 

112 elif d == 1: # if b dominates a 

113 b.dominates.append(a) # remember that it does 

114 a.n_dominated_by += 1 # increment b's domination count 

115 # now compute the fronts 

116 done: int = 0 

117 front_start: int 

118 front: int = 0 

119 while True: # repeat until all records are done 

120 front_start = done # remember end of old front 

121 front += 1 # step front counter 

122 for i in range(done, lp): # process all records not yet in a front 

123 a = pop[i] # pick record at index i 

124 if a.n_dominated_by == 0: # if it belongs to the current front 

125 a.front = front # set its front counter 

126 pop[i], pop[done] = pop[done], a # swap it to the front end 

127 done += 1 # increment length of done records 

128 if done >= needed_at_least: # are we done with sorting? 

129 break # yes -> break loop and return 

130 # the new front has been built, it ranges from [old_done, done) 

131 for j in range(front_start, done): 

132 a = pop[j] 

133 for b in a.dominates: 

134 b.n_dominated_by -= 1 

135 a.dominates.clear() 

136 

137 # clear the remaining domination records 

138 for i in range(front_start, len(pop)): 

139 pop[i].dominates.clear() 

140 

141 # return the number of elements sorted 

142 return done 

143 

144 

145def _crowding_distances(pop: list[_NSGA2Record]) -> None: 

146 """ 

147 Compute the crowding distances. 

148 

149 This method works as in the original NSGA-II, but it normalizes each 

150 dimension based on their ranges in the population. This may make sense 

151 because some objective functions may have huge ranges while others have 

152 very small ranges and would then basically be ignored in the crowding 

153 distance. 

154 

155 Also, we do not assign infinite crowding distances to border elements of 

156 collapsed dimensions. In other words, if all elements have the same value 

157 of one objective function, none of them would receive infinite crowding 

158 distance for this objective. This also makes sense because in this case, 

159 sorting would be rather random. 

160 

161 :param pop: the population 

162 """ 

163 for rec in pop: # set all crowding distances to 0 

164 rec.crowding_distance = 0.0 

165 

166 dimensions: Final[int] = len(pop[0].fs) 

167 for dim in range(dimensions): 

168 pop.sort(key=cast("Callable[[_NSGA2Record], Any]", 

169 lambda r, d=dim: r.fs[d])) # sort by dimension 

170 first = pop[0] # get the record with the smallest value in the dim 

171 last = pop[-1] # get the record with the largest value in the dim 

172 a = first.fs[dim] # get smallest value of dimension 

173 b = last.fs[dim] # get largest value of dimension 

174 if a >= b: # if smallest >= largest, all are the same! 

175 continue # we can ignore the dimension if it has collapsed 

176 first.crowding_distance = inf # crowding dist of smallest = inf 

177 last.crowding_distance = inf # crowding dist of largest = inf 

178 div = b - a # get divisor for normalization 

179 if div <= 0: # if the range collapses due to numerical imprecision... 

180 continue # then we can also skip this dimension 

181 # ok, we do have non-zero range 

182 nex = pop[1] # for each element, we need those to the left and right 

183 for i in range(2, len(pop)): 

184 rec = nex # first: rec = pop[1], second: pop[2], .... 

185 nex = pop[i] # first: nex = pop[2], second: pop[3], .... 

186 rec.crowding_distance += (nex.fs[dim] - first.fs[dim]) / div 

187 first = rec # current rec now = rec to the left next 

188 

189 

190class NSGA2(Algorithm2, MOAlgorithm): 

191 """The NSGA-II algorithm by Deb, Agrawal, Pratap, and Meyarivan.""" 

192 

193 def __init__(self, op0: Op0, op1: Op1, op2: Op2, 

194 pop_size: int, cr: float) -> None: 

195 """ 

196 Create the NSGA-II algorithm. 

197 

198 :param op0: the nullary search operator 

199 :param op1: the unary search operator 

200 :param op2: the binary search operator 

201 :param pop_size: the population size 

202 :param cr: the crossover rate 

203 """ 

204 super().__init__( 

205 f"nsga2_{pop_size}_{num_to_str_for_name(cr)}", op0, op1, op2) 

206 #: the population size 

207 self.pop_size: Final[int] = check_int_range(pop_size, "pop_size", 3) 

208 if not isinstance(cr, float): 

209 raise type_error(cr, "cr", float) 

210 if not 0 < cr < 1: 

211 raise ValueError(f"cr={cr}, but we need 0<cr<1.") 

212 #: the crossover rate 

213 self.cr: Final[float] = cr 

214 

215 def solve_mo(self, process: MOProcess) -> None: 

216 """ 

217 Apply the MO-RLS to an optimization problem. 

218 

219 :param process: the black-box process object 

220 """ 

221 # initialize fast calls and local constants 

222 create_x: Final[Callable[[], Any]] = process.create 

223 create_f: Final[Callable[[], np.ndarray]] = process.f_create 

224 domination: Final[Callable[[np.ndarray, np.ndarray], int]] = \ 

225 process.f_dominates 

226 should_terminate: Final[Callable[[], bool]] = process.should_terminate 

227 random: Final[Generator] = process.get_random() 

228 op0: Final[Callable[[Generator, Any], None]] = self.op0.op0 

229 op1: Final[Callable[[Generator, Any, Any], None]] = self.op1.op1 

230 op2: Final[Callable[[Generator, Any, Any, Any], None]] = self.op2.op2 

231 evaluate: Final[Callable[[Any, np.ndarray], Any]] = process.f_evaluate 

232 ri: Final[Callable[[int], int]] = random.integers 

233 rd: Final[Callable[[], float]] = random.uniform 

234 cr: Final[float] = self.cr 

235 

236 # create first population 

237 pop_size: Final[int] = self.pop_size 

238 pop_size_2: Final[int] = pop_size + pop_size 

239 pop: list[_NSGA2Record] = [] 

240 

241 # create the population 

242 for _ in range(pop_size_2): 

243 if should_terminate(): 

244 process.check_in_all(pop) # check in population 

245 return 

246 rec: _NSGA2Record = _NSGA2Record(create_x(), create_f()) 

247 op0(random, rec.x) 

248 evaluate(rec.x, rec.fs) 

249 pop.append(rec) 

250 

251 while True: # the main loop 

252 # Perform non-dominated sorting and get at least pop_size 

253 # elements in different fronts. 

254 # "start" marks the begin of the last front that was created. 

255 # "end" marks the total number of elements sorted. 

256 # It holds that "start < pop_size <= end". 

257 end = _non_dominated_sorting(pop, pop_size, domination) 

258 # We only perform the crowding distance computation on the first 

259 # "end" elements in the population. 

260 # We take this slice of the population (or the whole population if 

261 # all elements are non-domination) and compute the crowding 

262 # distance. 

263 candidates = pop[:end] if end < pop_size_2 else pop 

264 _crowding_distances(candidates) 

265 # Careful: crowding distance computation will shuffle the list 

266 # candidates, because it sorts based on dimensions. Therefore, we 

267 # need to re-sort the whole slice. 

268 candidates.sort() 

269 if candidates is not pop: # write back if need be 

270 pop[:end] = candidates 

271 

272 # fill offspring population 

273 for ofs_idx in range(pop_size, pop_size_2): 

274 # check if we should leave before evaluation 

275 if should_terminate(): 

276 process.check_in_all(pop) 

277 return # quit 

278 ofs: _NSGA2Record = pop[ofs_idx] # offspring to overwrite 

279 # binary tournament with replacement for first parent 

280 p1: _NSGA2Record = pop[ri(pop_size)] 

281 palt: _NSGA2Record = pop[ri(pop_size)] 

282 p1 = min(p1, palt) 

283 

284 if rd() >= cr: # mutation: only 1 parent needed 

285 op1(random, ofs.x, p1.x) # do mutation 

286 else: # crossover: need a second parent 

287 # binary tournament with replacement for second paren 

288 # (who must be different from first parent) 

289 p2: _NSGA2Record = p1 

290 while p2 is p1: 

291 p2 = pop[ri(pop_size)] 

292 palt = p1 

293 while palt is p1: 

294 palt = pop[ri(pop_size)] 

295 p2 = min(p2, palt) 

296 # got two parents, now do crossover 

297 op2(random, ofs.x, p1.x, p2.x) 

298 evaluate(ofs.x, ofs.fs) # otherwise: evaluate 

299 

300 def log_parameters_to(self, logger: KeyValueLogSection) -> None: 

301 """ 

302 Log the parameters of the NSGA-II algorithm to a logger. 

303 

304 :param logger: the logger for the parameters 

305 """ 

306 super().log_parameters_to(logger) 

307 logger.key_value("popSize", self.pop_size) 

308 logger.key_value("cr", self.cr) 

309 

310 def initialize(self) -> None: 

311 """Initialize the algorithm.""" 

312 Algorithm2.initialize(self)