Coverage for moptipy / algorithms / so / ffa / safea_n.py: 95%

80 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-24 08:49 +0000

1""" 

2A Hybrid SA-FEA Algorithm: the `SAFEA-N`. 

3 

4This hybrid algorithm has the following features: 

5 

6- The new solution of the FEA strand is always copied to the SA strand. 

7- The new solution of the SA strand is copied over to the FEA strand if it is 

8 better than the current solution of the FEA strand. 

9- The H-table is updated by both strands. 

10- The FEA strand always toggles back to the SA strand. 

11- The SA strand toggles to the FEA strand if it did not improve for a certain, 

12 increasing `time limit`. 

13- Every time the SA strand toggles over to the FEA strand, `time_limit` is 

14 incremented by 1. 

15""" 

16from collections import Counter 

17from math import exp 

18from typing import Callable, Final 

19 

20from numpy.random import Generator 

21from pycommons.types import type_error 

22 

23from moptipy.algorithms.modules.temperature_schedule import TemperatureSchedule 

24from moptipy.algorithms.so.ffa.ffa_h import create_h, log_h 

25from moptipy.api.algorithm import Algorithm1 

26from moptipy.api.operators import Op0, Op1 

27from moptipy.api.process import Process 

28from moptipy.utils.logger import KeyValueLogSection 

29 

30 

31class SAFEAN(Algorithm1): 

32 """An implementation of the SAFEA-N.""" 

33 

34 def __init__(self, op0: Op0, op1: Op1, schedule: TemperatureSchedule, 

35 log_h_tbl: bool = False) -> None: 

36 """ 

37 Create the SAFEA-N. 

38 

39 :param op0: the nullary search operator 

40 :param op1: the unary search operator 

41 :param schedule: the temperature schedule to use 

42 :param log_h_tbl: should we log the H table? 

43 """ 

44 if not isinstance(schedule, TemperatureSchedule): 

45 raise type_error(schedule, "schedule", TemperatureSchedule) 

46 if not isinstance(log_h_tbl, bool): 

47 raise type_error(log_h_tbl, "log_h_tbl", bool) 

48 super().__init__(f"safeaN_{schedule}", op0, op1) 

49 #: True if we should log the H table, False otherwise 

50 self.__log_h_tbl: Final[bool] = log_h_tbl 

51 #: the temperature schedule 

52 self.schedule: Final[TemperatureSchedule] = schedule 

53 

54 def solve(self, process: Process) -> None: 

55 """ 

56 Apply the SAFEA-N to an optimization problem. 

57 

58 :param process: the black-box process object 

59 """ 

60 # Create records for old and new point in the search space. 

61 x_ea = process.create() # record for current solution of the SA 

62 x_fea = process.create() # record for current solution of the FEA 

63 x_new = process.create() # record for new solution 

64 

65 # Obtain the random number generator. 

66 random: Final[Generator] = process.get_random() 

67 

68 # Put function references in variables to save time. 

69 evaluate: Final[Callable] = process.evaluate # the objective 

70 should_terminate: Final[Callable] = process.should_terminate 

71 temperature: Final[Callable[[int], float]] = self.schedule.temperature 

72 r01: Final[Callable[[], float]] = random.random # random from [0, 1) 

73 xcopy: Final[Callable] = process.copy # copy(dest, source) 

74 op0: Final[Callable] = self.op0.op0 # the nullary operator 

75 op1: Final[Callable] = self.op1.op1 # the unary operator 

76 

77 h, ofs = create_h(process) # Allocate the h-table 

78 

79 # Start at a random point in the search space and evaluate it. 

80 op0(random, x_ea) # Create 1 solution randomly and 

81 y_ea: int | float = evaluate(x_ea) + ofs # evaluate it. 

82 xcopy(x_fea, x_ea) # FEA and SA start with the same initial solution. 

83 y_fea: int | float = y_ea 

84 

85 sa_max_no_lt_moves: int = 1 # maximum no-improvement moves for SA 

86 sa_no_lt_moves: int = 0 # current no-improvement moves 

87 use_ffa: bool = False # We start with the SA branch. 

88 tau: int = 0 # The iteration index, needs to be 0 at first cmp. 

89 

90 while not should_terminate(): # Until we need to quit... 

91 # Sample and evaluate new solution. 

92 op1(random, x_new, x_fea if use_ffa else x_ea) 

93 y_new: int | float = evaluate(x_new) + ofs 

94 h[y_new] += 1 # type: ignore # Always update H. 

95 

96 if use_ffa: # The FEA branch uses FFA. 

97 use_ffa = False # Always toggle use from FFA to SA. 

98 sa_no_lt_moves = 0 # Reset the SA no-improv move counter. 

99 

100 h[y_fea] += 1 # type: ignore # Update H for FEA solution. 

101 if h[y_new] <= h[y_fea]: # type: ignore # FEA acceptance. 

102 xcopy(x_ea, x_new) # Copy solution also to SA. 

103 x_fea, x_new = x_new, x_fea 

104 y_fea = y_new 

105 else: # FEA does not accept, but we always copy to the SA, 

106 x_ea, x_new = x_new, x_ea # so we quickly swap here. 

107 y_ea = y_new # we always copy the solution over to the SA 

108 

109 else: # SA branch performs simulated annealing. 

110 h[y_ea] += 1 # type: ignore # Update H in *both* branches. 

111 

112 if y_new < y_fea: # Is new solution better than the FEA one? 

113 xcopy(x_fea, x_new) # Copy solution over to FEA. 

114 y_fea = y_new # And store the objective value. 

115 

116 if (y_new <= y_ea) or ( # Accept if <= or if SA criterion 

117 r01() < exp((y_ea - y_new) / temperature(tau))): 

118 x_ea, x_new = x_new, x_ea # Accept new solution. 

119 y_new, y_ea = y_ea, y_new # Swap values (for line below). 

120 if y_new > y_ea: # Check if we did an actual improvement. 

121 sa_no_lt_moves = 0 # non-improving moves counter = 0. 

122 continue # We can jump to the next iteration. 

123 

124 sa_no_lt_moves += 1 # Increase non-improved counter. 

125 if sa_no_lt_moves >= sa_max_no_lt_moves: # Toggle: SA to FEA. 

126 sa_max_no_lt_moves += 1 # Increment limit by one. 

127 use_ffa = True # Toggle to FFA. 

128 tau += 1 # Step the iteration index. 

129 

130 if not self.__log_h_tbl: 

131 return # we are done here 

132 

133 # After we are done, we want to print the H-table. 

134 if h[y_ea] == 0: # type: ignore # Fix the H-table for the case 

135 h = Counter() # that only one FE was performed: In this case, 

136 h[y_ea] = 1 # make Counter with only a single 1 value inside. 

137 

138 log_h(process, h, ofs) # log the H-table 

139 

140 def log_parameters_to(self, logger: KeyValueLogSection) -> None: 

141 """ 

142 Log all parameters of the SAFEA-N algorithm. 

143 

144 :param logger: the logger for the parameters 

145 """ 

146 super().log_parameters_to(logger) 

147 with logger.scope("ts") as ts: 

148 self.schedule.log_parameters_to(ts)