Coverage for moptipy / algorithms / so / ffa / ffa_h.py: 90%

62 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-24 08:49 +0000

1"""Here we provide some tools for implementing FFA.""" 

2 

3from collections import Counter 

4from io import StringIO 

5from math import nan 

6from typing import Callable, Final 

7 

8import numpy as np 

9from pycommons.io.csv import CSV_SEPARATOR 

10from pycommons.math.int_math import try_int 

11from pycommons.types import type_error 

12 

13from moptipy.api.objective import Objective 

14from moptipy.api.process import Process 

15from moptipy.utils.nputils import DEFAULT_INT 

16 

17#: the log section for the frequency table 

18H_LOG_SECTION: Final[str] = "H" 

19 

20#: The difference between upper- and lower bound at which we switch from 

21#: using arrays as backing store for the frequency table H to maps. 

22SWITCH_TO_MAP_RANGE: Final[int] = 67_108_864 

23 

24 

25def create_h(objective: Objective) -> tuple[ 

26 np.ndarray | Counter[int | float], int]: 

27 """ 

28 Create a data structure for recording frequencies. 

29 

30 If our objective function always returns integer values and its range is 

31 not too large, then we can use a :class:`numpy.ndarray` as a backing 

32 datastructure for the `H`-map used in FFA. If the objective function may 

33 sometimes return `float` values or has a wide range, it will be better 

34 (or even necessary) to use :class:`Counter` instead. 

35 

36 This function here makes the decision of what backing datastructure to 

37 use. 

38 

39 Additionally, it may be the lower or even upper bound of an integer 

40 objective function could be negative. To avoid dealing with negative 

41 indices, we would then add an offset to each objective value before 

42 accessing it. This function will return an appropriate offset as well. 

43 

44 :param objective: the objective for which the data structure should be 

45 created 

46 :return: a tuple of the `H`-data structure as well as an offset to be 

47 added to all objective values. 

48 """ 

49 if not isinstance(objective, Objective): 

50 raise type_error(objective, "objective", Objective) 

51 if objective.is_always_integer(): 

52 lb: Final[int | float] = objective.lower_bound() 

53 ub: Final[int | float] = objective.upper_bound() 

54 if isinstance(ub, int) and isinstance(lb, int) \ 

55 and ((ub - lb) <= SWITCH_TO_MAP_RANGE): 

56 return np.zeros(ub - lb + 1, DEFAULT_INT), -lb 

57 return Counter(), 0 

58 

59 

60def clear_h(h: np.ndarray | Counter[int | float]) -> None: 

61 """ 

62 Clear a H-Table. 

63 

64 :param h: the H-table to clear. 

65 """ 

66 if isinstance(h, np.ndarray): 

67 h.fill(0) 

68 elif isinstance(h, dict): 

69 h.clear() 

70 else: 

71 raise type_error(h, "h", (np.ndarray, Counter)) 

72 

73 

74def h_to_str( 

75 h: np.ndarray | Counter[int | float], 

76 offset: int) -> str: 

77 """ 

78 Convert a `H`-table to a string. 

79 

80 :param h: the `H`-table, as created by :func:`create_h`. 

81 :param offset: the offset, as computed by :func:`create_h`. 

82 

83 >>> hl = np.array([0, 0, 1, 7, 4, 0, 0, 9, 0]) 

84 >>> h_to_str(hl, 0) 

85 '2;;;7;;4;7;9' 

86 >>> h_to_str(hl, -1) 

87 '3;;;7;;4;8;9' 

88 >>> hd = Counter((1, 1, 1, 1, 1, 4, 4, 4, 4, 4, 4, 4, 3, 3, 3, 3, 

89 ... 3, 3, 2, 2, 2, 2, 2, 2, 2, 2, 2)) 

90 >>> h_to_str(hd, 0) 

91 '1;5;;9;;6;;7' 

92 >>> try: 

93 ... hd = {1: 0} 

94 ... h_to_str(hd, 0) 

95 ... except ValueError as ve: 

96 ... print(ve) 

97 empty H table? 

98 >>> hx = np.zeros(100, int) 

99 >>> hx[10] = 4 

100 >>> hx[12] = 234 

101 >>> hx[89] = 111 

102 >>> hx[90] = 1 

103 >>> hx[45] = 2314 

104 >>> h_to_str(hx, 0) 

105 '10;4;12;234;45;2314;89;111;;' 

106 """ 

107 if not isinstance(h, np.ndarray | dict): 

108 raise type_error(h, "h", (np.ndarray, Counter)) 

109 if not isinstance(offset, int): 

110 raise type_error(offset, "offset", int) 

111 csep: Final[str] = CSV_SEPARATOR 

112 sep: str = "" 

113 

114 with StringIO() as out: 

115 write: Callable[[str], int] = out.write # fast call 

116 old_index: int | float = nan 

117 

118 # We iterate over the whole `H` table. 

119 # If the table is an np.array, then we just use the indices to 

120 # iterate directly. If it is a `Counter`, then we iterate over 

121 # its keys. If there are not too many keys, then we sort them 

122 # first. 

123 for i in range(len(h)) if isinstance(h, np.ndarray) else (sorted( 

124 h.keys()) if len(h) <= SWITCH_TO_MAP_RANGE else h.keys()): 

125 value: int | float = h[i] # type: ignore 

126 if value <= 0: # skip over values that have never been hit 

127 continue 

128 write(sep) # write separator (nothing upon first call) 

129 sep = csep # now the next call will write ; 

130 use_index: int | float = i - offset # subtract the offset 

131 if isinstance(use_index, float): # if it's float, try to convert 

132 use_index = try_int(use_index) 

133 if (use_index - 1) != old_index: # we skip if current = old + 1 

134 write(str(use_index)) 

135 old_index = use_index # step the index 

136 write(csep) # write separator to frequency counter 

137 if value > 1: 

138 write(str(value)) # write the frequency 

139 res: Final[str] = out.getvalue() # get the final string 

140 

141 if str.__len__(res) <= 0: 

142 raise ValueError("empty H table?") 

143 return res 

144 

145 

146def log_h(process: Process, h: np.ndarray | Counter[int | float], 

147 offset: int) -> None: 

148 """ 

149 Convert a frequency table `H` to a string and log it to a process. 

150 

151 The frequency table is logged as a single line of text into a section 

152 `H` delimited by the lines `BEGIN_H` and `END_H`. The line consists 

153 of `2*n` semicolon separated values. Each such value pair consists of 

154 an objective value `y` and its observed frequency `H[y]`. The former is 

155 either an integer or a float and the latter is an integer. 

156 If two consecutive objective values take the form 'f` and `f+1`, then 

157 the second one will be omitted, and so on. 

158 

159 :param process: the process 

160 :param h: the `H`-table, as created by :func:`create_h`. 

161 :param offset: the offset, as computed by :func:`create_h`. 

162 """ 

163 if not isinstance(process, Process): 

164 raise type_error(process, "process", Process) 

165 if process.has_log(): 

166 process.add_log_section(H_LOG_SECTION, h_to_str(h, offset))