Coverage for moptipyapps / order1d / instance.py: 83%
113 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 04:40 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 04:40 +0000
1"""An instance of the ordering problem."""
4from math import isfinite
5from typing import Callable, Final, Iterable, TypeVar, cast
7import numpy as np
8from moptipy.utils.logger import KeyValueLogSection
9from moptipy.utils.nputils import DEFAULT_INT
10from moptipy.utils.strings import (
11 num_to_str_for_name,
12 sanitize_name,
13)
14from pycommons.types import check_int_range, type_error
15from scipy.stats import rankdata # type: ignore
17from moptipyapps.qap.instance import Instance as QAPInstance
19#: the type variable for the source object
20T = TypeVar("T")
22#: the zero-based index column name
23_ZERO_BASED_INDEX: Final[str] = "indexZeroBased"
24#: the suggested x-coordinate
25_SUGGESTED_X_IN_0_1: Final[str] = "suggestedXin01"
28class Instance(QAPInstance):
29 """
30 An instance of the One-Dimensional Ordering Problem.
32 Such an instance represents the ranking of objects by their distance to
33 each other as a :mod:`~moptipyapps.qap` problem.
35 >>> def _dist(a, b):
36 ... return abs(a - b)
37 >>> def _tags(a):
38 ... return f"t{a}"
39 >>> the_instance = Instance.from_sequence_and_distance(
40 ... [1, 2, 3], _dist, 1, 100, ("bla", ), _tags)
41 >>> print(the_instance)
42 order1d_3_1
43 >>> print(the_instance.distances)
44 [[0 1 2]
45 [1 0 1]
46 [2 1 0]]
47 >>> print(the_instance.flows)
48 [[0 4 2]
49 [3 0 3]
50 [2 4 0]]
51 """
53 def __init__(self, distances: np.ndarray,
54 flow_power: int | float,
55 horizon: int,
56 tag_titles: Iterable[str],
57 tags: Iterable[tuple[Iterable[str] | str, int]],
58 name: str | None = None) -> None:
59 """
60 Create an instance of the quadratic assignment problem.
62 :param distances: the original distance matrix
63 :param flow_power: the power to be used for constructing the flows
64 based on the original distances
65 :param horizon: the horizon for distance ranks, after which all
66 elements are ignored
67 :param tag_titles: the tag titles
68 :param tags: the assignment of rows to names
69 :param name: the instance name
70 """
71 if not isinstance(flow_power, int | float):
72 raise type_error(flow_power, "flow_power", (int, float))
73 if not (isfinite(flow_power) and (0 < flow_power < 100)):
74 raise ValueError(
75 f"flow_power must be in (0, 100), but is {flow_power}.")
77 n: Final[int] = len(distances) # the number of elements
78 horizon = check_int_range(horizon, "horizon", 1, 1_000_000_000_000)
80 # construct the distance matrix
81 dist_matrix = np.zeros((n, n), DEFAULT_INT)
82 for i in range(n):
83 for j in range(i + 1, n):
84 dist_matrix[i, j] = dist_matrix[j, i] = j - i
86 # construct the flow matrix
87 flows: np.ndarray = rankdata(
88 distances, axis=1, method="average") - 1.0
90 # get the flow rank multiplier
91 multiplier: float = 1.0
92 for i in range(n):
93 for j in range(n):
94 if i == j:
95 continue
96 f = flows[i, j]
97 if int(f) < f <= horizon: # dow we need the multiplier?
98 multiplier = 0.5 ** (-flow_power)
99 break
101 flow_matrix = np.zeros((n, n), DEFAULT_INT)
102 max_val: Final[int] = min(n - 1, horizon)
103 for i in range(n):
104 for j in range(n):
105 if i == j:
106 continue
107 f = flows[i, j]
108 if f > horizon:
109 continue
110 flow_matrix[i, j] = round(
111 multiplier * ((max_val - f + 1) ** flow_power))
113 # construct a name if necessary
114 if name is None:
115 name = f"order1d_{n}_{num_to_str_for_name(flow_power)}"
116 if max_val < (n - 1):
117 name = f"{name}_{max_val}"
119 super().__init__(dist_matrix, flow_matrix, name=name)
121 if self.n != n:
122 raise ValueError("error when assigning number of elements!")
124 #: the flow power
125 self.flow_power: Final[int | float] = flow_power
126 #: the horizon
127 self.horizon: Final[int] = max_val
129 if not isinstance(tags, Iterable):
130 raise type_error(tags, "tags", Iterable)
131 if not isinstance(tag_titles, Iterable):
132 raise type_error(tag_titles, "tag_titles", Iterable)
134 #: the tag titles
135 self.tag_titles: Final[tuple[str, ...]] = tuple(map(
136 sanitize_name, tag_titles))
137 req_len: Final[int] = len(self.tag_titles)
138 if req_len <= 0:
139 raise ValueError("No tags specified.")
141 #: the tags, i.e., additional data for each original element
142 self.tags: Final[tuple[tuple[tuple[str, ...], int], ...]] = (
143 tuple(((t, ) if isinstance(t, str)
144 else tuple(t), k) for t, k in tags))
145 if _ZERO_BASED_INDEX in self.tags:
146 raise ValueError(f"{_ZERO_BASED_INDEX} not permitted in tags.")
147 if _SUGGESTED_X_IN_0_1 in self.tags:
148 raise ValueError(f"{_SUGGESTED_X_IN_0_1} not permitted in tags.")
150 if len(self.tags) < n:
151 raise ValueError(f"there must be at least {self.n} tags, but got "
152 f"{len(self.tags)}, i.e., {self.tags}")
153 for tag in self.tags:
154 check_int_range(tag[1], "id", 0, n)
155 if len(tag[0]) != req_len:
156 raise ValueError(
157 f"all tags must have the same length. "
158 f"while the first tag ({self.tags[0]}) has "
159 f"length {req_len}, {tag} has length {len(tag[0])}.")
161 @staticmethod
162 def from_sequence_and_distance(
163 data: Iterable[T | None],
164 get_distance: Callable[[T, T], int | float],
165 flow_power: int | float,
166 horizon: int,
167 tag_titles: Iterable[str],
168 get_tags: Callable[[T], str | Iterable[str]],
169 name: str | None = None) -> "Instance":
170 """
171 Turn a sequence of objects into a One-Dimensional Ordering instance.
173 :param data: the data source, i.e., an iterable of data elements
174 :param get_tags: the function for extracting tags from objects
175 :param get_distance: the function for getting the distance between
176 objects
177 :param flow_power: the flow power
178 :param horizon: the maximal considered rank
179 :param tag_titles: the tag names
180 :param name: the optional name
181 :return: the ordering instance
183 >>> def _dist(a, b):
184 ... return abs(a - b)
185 >>> def _tags(a):
186 ... return f"x{a}", f"b{a}"
187 >>> res = Instance.from_sequence_and_distance(
188 ... [1, 2, 5], _dist, 2, 100, ("a", "b"), _tags)
189 >>> print(res)
190 order1d_3_2
191 >>> print(res.flows)
192 [[0 4 1]
193 [4 0 1]
194 [1 4 0]]
195 >>> print(res.distances)
196 [[0 1 2]
197 [1 0 1]
198 [2 1 0]]
199 >>> res = Instance.from_sequence_and_distance(
200 ... [1, 2, 35, 4], _dist, 2, 100, ("a", "b"), _tags)
201 >>> print(res)
202 order1d_4_2
203 >>> print(res.flows)
204 [[0 9 1 4]
205 [9 0 1 4]
206 [1 4 0 9]
207 [4 9 1 0]]
208 >>> print(res.distances)
209 [[0 1 2 3]
210 [1 0 1 2]
211 [2 1 0 1]
212 [3 2 1 0]]
213 >>> res = Instance.from_sequence_and_distance(
214 ... [1, 2, 4, 4], _dist, 2, 100, ("a", "b"), _tags)
215 >>> print(res)
216 order1d_3_2
217 >>> print(res.flows)
218 [[0 4 1]
219 [4 0 1]
220 [1 4 0]]
221 >>> print(res.distances)
222 [[0 1 2]
223 [1 0 1]
224 [2 1 0]]
225 >>> print(res.tags)
226 ((('x1', 'b1'), 0), (('x2', 'b2'), 1), (('x4', 'b4'), 2), \
227(('x4', 'b4'), 2))
228 >>> def _dist2(a, b):
229 ... return abs(abs(a) - abs(b)) + 1
230 >>> res = Instance.from_sequence_and_distance(
231 ... [1, 2, -4, 4], _dist2, 2, 100, ("a", "b"), _tags)
232 >>> print(res)
233 order1d_4_2
234 >>> print(res.flows)
235 [[ 0 36 9 9]
236 [36 0 9 9]
237 [ 4 16 0 36]
238 [ 4 16 36 0]]
239 >>> print(res.distances)
240 [[0 1 2 3]
241 [1 0 1 2]
242 [2 1 0 1]
243 [3 2 1 0]]
244 >>> res = Instance.from_sequence_and_distance(
245 ... [1, 2, -4, 4], _dist2, 3, 100, ("a", "b"), _tags)
246 >>> print(res)
247 order1d_4_3
248 >>> print(res.flows)
249 [[ 0 216 27 27]
250 [216 0 27 27]
251 [ 8 64 0 216]
252 [ 8 64 216 0]]
253 >>> print(res.distances)
254 [[0 1 2 3]
255 [1 0 1 2]
256 [2 1 0 1]
257 [3 2 1 0]]
258 >>> print(res.tags)
259 ((('x1', 'b1'), 0), (('x2', 'b2'), 1), (('x-4', 'b-4'), 2), \
260(('x4', 'b4'), 3))
261 >>> res = Instance.from_sequence_and_distance(
262 ... [1, 2, -4, 4], _dist2, 3, 2, ("a", "b"), _tags)
263 >>> print(res)
264 order1d_4_3_2
265 >>> print(res.flows)
266 [[0 8 0 0]
267 [8 0 0 0]
268 [0 1 0 8]
269 [0 1 8 0]]
270 >>> print(res.distances)
271 [[0 1 2 3]
272 [1 0 1 2]
273 [2 1 0 1]
274 [3 2 1 0]]
275 >>> print(res.tags)
276 ((('x1', 'b1'), 0), (('x2', 'b2'), 1), (('x-4', 'b-4'), 2), \
277(('x4', 'b4'), 3))
278 >>> res = Instance.from_sequence_and_distance(
279 ... [1, 2, -4, 4], _dist2, 2, 2, ("a", "b"), _tags)
280 >>> print(res)
281 order1d_4_2_2
282 >>> print(res.flows)
283 [[0 4 0 0]
284 [4 0 0 0]
285 [0 1 0 4]
286 [0 1 4 0]]
287 >>> print(res.distances)
288 [[0 1 2 3]
289 [1 0 1 2]
290 [2 1 0 1]
291 [3 2 1 0]]
292 """
293 if not isinstance(data, Iterable):
294 raise type_error(data, "data", Iterable)
295 if not callable(get_tags):
296 raise type_error(get_tags, "get_tags", call=True)
297 if not callable(get_distance):
298 raise type_error(get_distance, "get_distance", call=True)
299 if not isinstance(tag_titles, Iterable):
300 raise type_error(tag_titles, "tag_titles", Iterable)
302 # build a distance matrix and purge all zero-distance elements
303 datal: list[T] = cast("list[T]", data) \
304 if isinstance(data, list) else list(data)
305 mappings: list[tuple[T, int]] = []
306 distances: list[list[int | float]] = []
307 i: int = 0
308 while i < len(datal):
309 o1: T = datal[i]
310 j: int = i + 1
311 current_dists: list[int | float] = [d[i] for d in distances]
312 current_dists.append(0)
313 while j < len(datal):
314 o2: T = datal[j]
315 dist: int | float = get_distance(o1, o2)
316 if not (isfinite(dist) and (0 <= dist <= 1e100)):
317 raise ValueError(
318 f"invalid distance {dist} for objects {i}, {j}")
319 if dist <= 0: # distance == 0, must purge
320 mappings.append((o2, i))
321 del datal[j]
322 for ds in distances:
323 del ds[j]
324 continue
325 current_dists.append(dist)
326 j += 1
327 mappings.append((o1, i))
328 distances.append(current_dists)
329 i += 1
331 # we now got a full distance matrix, let's turn it into a rank matrix
332 return Instance(np.array(distances),
333 flow_power, horizon, tag_titles,
334 ((get_tags(obj), idx) for obj, idx in mappings),
335 name)
337 def log_parameters_to(self, logger: KeyValueLogSection) -> None:
338 """
339 Log all parameters of this component as key-value pairs.
341 :param logger: the logger for the parameters
342 """
343 super().log_parameters_to(logger)
344 logger.key_value("flowPower", self.flow_power)
345 logger.key_value("horizon", self.horizon)
346 logger.key_value("nOrig", len(self.tags))