Coverage for moptipy / algorithms / mo / nsga2.py: 97%
147 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-24 08:49 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-24 08:49 +0000
1"""
2An implementation of NSGA-II, a famous multi-objective optimization method.
4Here we implement the famous NSGA-II algorithm by Deb, Agrawal, Pratap, and
5Meyarivan.
7We apply the following modifications to the computation of the crowding
8distance:
9First, we normalize each dimension based on its range in the population. This
10may make sense because some objective functions may have huge ranges while
11others have very small ranges and would then basically be ignored in the
12crowding distance.
13Also, we do not assign infinite crowding distances to border elements of
14collapsed dimensions. In other words, if all elements have the same value of
15one objective function, none of them would receive infinite crowding distance
16for this objective. This also makes sense because in this case, sorting would
17be rather random.
18We also start by generating the 2*:attr:`NSGA2.pop_size` random solutions.
19When selecting parents, we apply tournament selection with replacement.
211. Kalyanmoy Deb, Samir Agrawal, Amrit Pratap, and T. Meyarivan. A Fast
22 Elitist Non-Dominated Sorting Genetic Algorithm for Multi-Objective
23 Optimization: NSGA-II. In Proceedings of the International Conference
24 on Parallel Problem Solving from Nature (PPSN VI), September 18-20, 2000,
25 Paris, France, pages 849-858. Lecture Notes in Computer Science,
26 volume 1917. ISBN 978-3-540-41056-0. Berlin, Heidelberg: Springer.
27 https://doi.org/10.1007/3-540-45356-3_83.
28 Also: KanGAL Report Number 200001.
29 http://repository.ias.ac.in/83498/1/2-a.pdf.
31"""
32from math import inf
33from typing import Any, Callable, Final, cast
35import numpy as np
36from numpy.random import Generator
37from pycommons.types import check_int_range, type_error
39from moptipy.api.algorithm import Algorithm2
40from moptipy.api.mo_algorithm import MOAlgorithm
41from moptipy.api.mo_archive import MORecord
42from moptipy.api.mo_process import MOProcess
43from moptipy.api.operators import Op0, Op1, Op2
44from moptipy.utils.logger import KeyValueLogSection
45from moptipy.utils.strings import num_to_str_for_name
48class _NSGA2Record(MORecord):
49 """The NSGA-II specific internal record structure."""
51 def __init__(self, x: Any, fs: np.ndarray) -> None:
52 """
53 Create a multi-objective record.
55 :param x: the point in the search space
56 :param fs: the vector of objective values
57 """
58 super().__init__(x, fs)
59 #: the list of dominated solutions
60 self.dominates: Final[list[_NSGA2Record]] = []
61 #: the number of solutions this one is dominated by
62 self.n_dominated_by: int = 0
63 #: the pareto front index
64 self.front: int = -1
65 #: the crowding distance
66 self.crowding_distance: float = 0.0
68 def __lt__(self, other):
69 """
70 Compare according to domination front and crowding distance.
72 :param other: the other element to compare with
73 :returns: `True` if an only if this solution has either a smaller
74 Pareto front index (:attr:`front`) or the same front index
75 and a larger crowding distance (:attr:`crowding_distance`)
76 """
77 fs: Final[int] = self.front
78 fo: Final[int] = other.front
79 return (fs < fo) or ((fs == fo) and (
80 self.crowding_distance > other.crowding_distance))
83def _non_dominated_sorting(
84 pop: list[_NSGA2Record],
85 needed_at_least: int,
86 domination: Callable[[np.ndarray, np.ndarray], int]) -> int:
87 """
88 Perform the fast non-dominated sorting.
90 :param pop: the population
91 :param needed_at_least: the number of elements we actually need
92 :param domination: the domination relationship
93 :returns: the end of the last relevant frontier, with
94 `needed_at_least <= end`
95 """
96 lp: Final[int] = len(pop)
98 # clear the domination record
99 for rec in pop:
100 rec.n_dominated_by = 0
102 # compute the domination
103 for i in range(lp - 1):
104 a = pop[i]
105 # we only check each pair of solutions once
106 for j in range(i + 1, lp):
107 b = pop[j]
108 d = domination(a.fs, b.fs)
109 if d == -1: # if a dominates b
110 a.dominates.append(b) # remember that it does
111 b.n_dominated_by += 1 # increment b's domination count
112 elif d == 1: # if b dominates a
113 b.dominates.append(a) # remember that it does
114 a.n_dominated_by += 1 # increment b's domination count
115 # now compute the fronts
116 done: int = 0
117 front_start: int
118 front: int = 0
119 while True: # repeat until all records are done
120 front_start = done # remember end of old front
121 front += 1 # step front counter
122 for i in range(done, lp): # process all records not yet in a front
123 a = pop[i] # pick record at index i
124 if a.n_dominated_by == 0: # if it belongs to the current front
125 a.front = front # set its front counter
126 pop[i], pop[done] = pop[done], a # swap it to the front end
127 done += 1 # increment length of done records
128 if done >= needed_at_least: # are we done with sorting?
129 break # yes -> break loop and return
130 # the new front has been built, it ranges from [old_done, done)
131 for j in range(front_start, done):
132 a = pop[j]
133 for b in a.dominates:
134 b.n_dominated_by -= 1
135 a.dominates.clear()
137 # clear the remaining domination records
138 for i in range(front_start, len(pop)):
139 pop[i].dominates.clear()
141 # return the number of elements sorted
142 return done
145def _crowding_distances(pop: list[_NSGA2Record]) -> None:
146 """
147 Compute the crowding distances.
149 This method works as in the original NSGA-II, but it normalizes each
150 dimension based on their ranges in the population. This may make sense
151 because some objective functions may have huge ranges while others have
152 very small ranges and would then basically be ignored in the crowding
153 distance.
155 Also, we do not assign infinite crowding distances to border elements of
156 collapsed dimensions. In other words, if all elements have the same value
157 of one objective function, none of them would receive infinite crowding
158 distance for this objective. This also makes sense because in this case,
159 sorting would be rather random.
161 :param pop: the population
162 """
163 for rec in pop: # set all crowding distances to 0
164 rec.crowding_distance = 0.0
166 dimensions: Final[int] = len(pop[0].fs)
167 for dim in range(dimensions):
168 pop.sort(key=cast("Callable[[_NSGA2Record], Any]",
169 lambda r, d=dim: r.fs[d])) # sort by dimension
170 first = pop[0] # get the record with the smallest value in the dim
171 last = pop[-1] # get the record with the largest value in the dim
172 a = first.fs[dim] # get smallest value of dimension
173 b = last.fs[dim] # get largest value of dimension
174 if a >= b: # if smallest >= largest, all are the same!
175 continue # we can ignore the dimension if it has collapsed
176 first.crowding_distance = inf # crowding dist of smallest = inf
177 last.crowding_distance = inf # crowding dist of largest = inf
178 div = b - a # get divisor for normalization
179 if div <= 0: # if the range collapses due to numerical imprecision...
180 continue # then we can also skip this dimension
181 # ok, we do have non-zero range
182 nex = pop[1] # for each element, we need those to the left and right
183 for i in range(2, len(pop)):
184 rec = nex # first: rec = pop[1], second: pop[2], ....
185 nex = pop[i] # first: nex = pop[2], second: pop[3], ....
186 rec.crowding_distance += (nex.fs[dim] - first.fs[dim]) / div
187 first = rec # current rec now = rec to the left next
190class NSGA2(Algorithm2, MOAlgorithm):
191 """The NSGA-II algorithm by Deb, Agrawal, Pratap, and Meyarivan."""
193 def __init__(self, op0: Op0, op1: Op1, op2: Op2,
194 pop_size: int, cr: float) -> None:
195 """
196 Create the NSGA-II algorithm.
198 :param op0: the nullary search operator
199 :param op1: the unary search operator
200 :param op2: the binary search operator
201 :param pop_size: the population size
202 :param cr: the crossover rate
203 """
204 super().__init__(
205 f"nsga2_{pop_size}_{num_to_str_for_name(cr)}", op0, op1, op2)
206 #: the population size
207 self.pop_size: Final[int] = check_int_range(pop_size, "pop_size", 3)
208 if not isinstance(cr, float):
209 raise type_error(cr, "cr", float)
210 if not 0 < cr < 1:
211 raise ValueError(f"cr={cr}, but we need 0<cr<1.")
212 #: the crossover rate
213 self.cr: Final[float] = cr
215 def solve_mo(self, process: MOProcess) -> None:
216 """
217 Apply the MO-RLS to an optimization problem.
219 :param process: the black-box process object
220 """
221 # initialize fast calls and local constants
222 create_x: Final[Callable[[], Any]] = process.create
223 create_f: Final[Callable[[], np.ndarray]] = process.f_create
224 domination: Final[Callable[[np.ndarray, np.ndarray], int]] = \
225 process.f_dominates
226 should_terminate: Final[Callable[[], bool]] = process.should_terminate
227 random: Final[Generator] = process.get_random()
228 op0: Final[Callable[[Generator, Any], None]] = self.op0.op0
229 op1: Final[Callable[[Generator, Any, Any], None]] = self.op1.op1
230 op2: Final[Callable[[Generator, Any, Any, Any], None]] = self.op2.op2
231 evaluate: Final[Callable[[Any, np.ndarray], Any]] = process.f_evaluate
232 ri: Final[Callable[[int], int]] = random.integers
233 rd: Final[Callable[[], float]] = random.uniform
234 cr: Final[float] = self.cr
236 # create first population
237 pop_size: Final[int] = self.pop_size
238 pop_size_2: Final[int] = pop_size + pop_size
239 pop: list[_NSGA2Record] = []
241 # create the population
242 for _ in range(pop_size_2):
243 if should_terminate():
244 process.check_in_all(pop) # check in population
245 return
246 rec: _NSGA2Record = _NSGA2Record(create_x(), create_f())
247 op0(random, rec.x)
248 evaluate(rec.x, rec.fs)
249 pop.append(rec)
251 while True: # the main loop
252 # Perform non-dominated sorting and get at least pop_size
253 # elements in different fronts.
254 # "start" marks the begin of the last front that was created.
255 # "end" marks the total number of elements sorted.
256 # It holds that "start < pop_size <= end".
257 end = _non_dominated_sorting(pop, pop_size, domination)
258 # We only perform the crowding distance computation on the first
259 # "end" elements in the population.
260 # We take this slice of the population (or the whole population if
261 # all elements are non-domination) and compute the crowding
262 # distance.
263 candidates = pop[:end] if end < pop_size_2 else pop
264 _crowding_distances(candidates)
265 # Careful: crowding distance computation will shuffle the list
266 # candidates, because it sorts based on dimensions. Therefore, we
267 # need to re-sort the whole slice.
268 candidates.sort()
269 if candidates is not pop: # write back if need be
270 pop[:end] = candidates
272 # fill offspring population
273 for ofs_idx in range(pop_size, pop_size_2):
274 # check if we should leave before evaluation
275 if should_terminate():
276 process.check_in_all(pop)
277 return # quit
278 ofs: _NSGA2Record = pop[ofs_idx] # offspring to overwrite
279 # binary tournament with replacement for first parent
280 p1: _NSGA2Record = pop[ri(pop_size)]
281 palt: _NSGA2Record = pop[ri(pop_size)]
282 p1 = min(p1, palt)
284 if rd() >= cr: # mutation: only 1 parent needed
285 op1(random, ofs.x, p1.x) # do mutation
286 else: # crossover: need a second parent
287 # binary tournament with replacement for second paren
288 # (who must be different from first parent)
289 p2: _NSGA2Record = p1
290 while p2 is p1:
291 p2 = pop[ri(pop_size)]
292 palt = p1
293 while palt is p1:
294 palt = pop[ri(pop_size)]
295 p2 = min(p2, palt)
296 # got two parents, now do crossover
297 op2(random, ofs.x, p1.x, p2.x)
298 evaluate(ofs.x, ofs.fs) # otherwise: evaluate
300 def log_parameters_to(self, logger: KeyValueLogSection) -> None:
301 """
302 Log the parameters of the NSGA-II algorithm to a logger.
304 :param logger: the logger for the parameters
305 """
306 super().log_parameters_to(logger)
307 logger.key_value("popSize", self.pop_size)
308 logger.key_value("cr", self.cr)
310 def initialize(self) -> None:
311 """Initialize the algorithm."""
312 Algorithm2.initialize(self)