Coverage for moptipy / algorithms / so / ffa / ffa_h.py: 90%
62 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-24 08:49 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-24 08:49 +0000
1"""Here we provide some tools for implementing FFA."""
3from collections import Counter
4from io import StringIO
5from math import nan
6from typing import Callable, Final
8import numpy as np
9from pycommons.io.csv import CSV_SEPARATOR
10from pycommons.math.int_math import try_int
11from pycommons.types import type_error
13from moptipy.api.objective import Objective
14from moptipy.api.process import Process
15from moptipy.utils.nputils import DEFAULT_INT
17#: the log section for the frequency table
18H_LOG_SECTION: Final[str] = "H"
20#: The difference between upper- and lower bound at which we switch from
21#: using arrays as backing store for the frequency table H to maps.
22SWITCH_TO_MAP_RANGE: Final[int] = 67_108_864
25def create_h(objective: Objective) -> tuple[
26 np.ndarray | Counter[int | float], int]:
27 """
28 Create a data structure for recording frequencies.
30 If our objective function always returns integer values and its range is
31 not too large, then we can use a :class:`numpy.ndarray` as a backing
32 datastructure for the `H`-map used in FFA. If the objective function may
33 sometimes return `float` values or has a wide range, it will be better
34 (or even necessary) to use :class:`Counter` instead.
36 This function here makes the decision of what backing datastructure to
37 use.
39 Additionally, it may be the lower or even upper bound of an integer
40 objective function could be negative. To avoid dealing with negative
41 indices, we would then add an offset to each objective value before
42 accessing it. This function will return an appropriate offset as well.
44 :param objective: the objective for which the data structure should be
45 created
46 :return: a tuple of the `H`-data structure as well as an offset to be
47 added to all objective values.
48 """
49 if not isinstance(objective, Objective):
50 raise type_error(objective, "objective", Objective)
51 if objective.is_always_integer():
52 lb: Final[int | float] = objective.lower_bound()
53 ub: Final[int | float] = objective.upper_bound()
54 if isinstance(ub, int) and isinstance(lb, int) \
55 and ((ub - lb) <= SWITCH_TO_MAP_RANGE):
56 return np.zeros(ub - lb + 1, DEFAULT_INT), -lb
57 return Counter(), 0
60def clear_h(h: np.ndarray | Counter[int | float]) -> None:
61 """
62 Clear a H-Table.
64 :param h: the H-table to clear.
65 """
66 if isinstance(h, np.ndarray):
67 h.fill(0)
68 elif isinstance(h, dict):
69 h.clear()
70 else:
71 raise type_error(h, "h", (np.ndarray, Counter))
74def h_to_str(
75 h: np.ndarray | Counter[int | float],
76 offset: int) -> str:
77 """
78 Convert a `H`-table to a string.
80 :param h: the `H`-table, as created by :func:`create_h`.
81 :param offset: the offset, as computed by :func:`create_h`.
83 >>> hl = np.array([0, 0, 1, 7, 4, 0, 0, 9, 0])
84 >>> h_to_str(hl, 0)
85 '2;;;7;;4;7;9'
86 >>> h_to_str(hl, -1)
87 '3;;;7;;4;8;9'
88 >>> hd = Counter((1, 1, 1, 1, 1, 4, 4, 4, 4, 4, 4, 4, 3, 3, 3, 3,
89 ... 3, 3, 2, 2, 2, 2, 2, 2, 2, 2, 2))
90 >>> h_to_str(hd, 0)
91 '1;5;;9;;6;;7'
92 >>> try:
93 ... hd = {1: 0}
94 ... h_to_str(hd, 0)
95 ... except ValueError as ve:
96 ... print(ve)
97 empty H table?
98 >>> hx = np.zeros(100, int)
99 >>> hx[10] = 4
100 >>> hx[12] = 234
101 >>> hx[89] = 111
102 >>> hx[90] = 1
103 >>> hx[45] = 2314
104 >>> h_to_str(hx, 0)
105 '10;4;12;234;45;2314;89;111;;'
106 """
107 if not isinstance(h, np.ndarray | dict):
108 raise type_error(h, "h", (np.ndarray, Counter))
109 if not isinstance(offset, int):
110 raise type_error(offset, "offset", int)
111 csep: Final[str] = CSV_SEPARATOR
112 sep: str = ""
114 with StringIO() as out:
115 write: Callable[[str], int] = out.write # fast call
116 old_index: int | float = nan
118 # We iterate over the whole `H` table.
119 # If the table is an np.array, then we just use the indices to
120 # iterate directly. If it is a `Counter`, then we iterate over
121 # its keys. If there are not too many keys, then we sort them
122 # first.
123 for i in range(len(h)) if isinstance(h, np.ndarray) else (sorted(
124 h.keys()) if len(h) <= SWITCH_TO_MAP_RANGE else h.keys()):
125 value: int | float = h[i] # type: ignore
126 if value <= 0: # skip over values that have never been hit
127 continue
128 write(sep) # write separator (nothing upon first call)
129 sep = csep # now the next call will write ;
130 use_index: int | float = i - offset # subtract the offset
131 if isinstance(use_index, float): # if it's float, try to convert
132 use_index = try_int(use_index)
133 if (use_index - 1) != old_index: # we skip if current = old + 1
134 write(str(use_index))
135 old_index = use_index # step the index
136 write(csep) # write separator to frequency counter
137 if value > 1:
138 write(str(value)) # write the frequency
139 res: Final[str] = out.getvalue() # get the final string
141 if str.__len__(res) <= 0:
142 raise ValueError("empty H table?")
143 return res
146def log_h(process: Process, h: np.ndarray | Counter[int | float],
147 offset: int) -> None:
148 """
149 Convert a frequency table `H` to a string and log it to a process.
151 The frequency table is logged as a single line of text into a section
152 `H` delimited by the lines `BEGIN_H` and `END_H`. The line consists
153 of `2*n` semicolon separated values. Each such value pair consists of
154 an objective value `y` and its observed frequency `H[y]`. The former is
155 either an integer or a float and the latter is an integer.
156 If two consecutive objective values take the form 'f` and `f+1`, then
157 the second one will be omitted, and so on.
159 :param process: the process
160 :param h: the `H`-table, as created by :func:`create_h`.
161 :param offset: the offset, as computed by :func:`create_h`.
162 """
163 if not isinstance(process, Process):
164 raise type_error(process, "process", Process)
165 if process.has_log():
166 process.add_log_section(H_LOG_SECTION, h_to_str(h, offset))