Coverage for moptipyapps / order1d / instance.py: 83%

113 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-11 04:40 +0000

1"""An instance of the ordering problem.""" 

2 

3 

4from math import isfinite 

5from typing import Callable, Final, Iterable, TypeVar, cast 

6 

7import numpy as np 

8from moptipy.utils.logger import KeyValueLogSection 

9from moptipy.utils.nputils import DEFAULT_INT 

10from moptipy.utils.strings import ( 

11 num_to_str_for_name, 

12 sanitize_name, 

13) 

14from pycommons.types import check_int_range, type_error 

15from scipy.stats import rankdata # type: ignore 

16 

17from moptipyapps.qap.instance import Instance as QAPInstance 

18 

19#: the type variable for the source object 

20T = TypeVar("T") 

21 

22#: the zero-based index column name 

23_ZERO_BASED_INDEX: Final[str] = "indexZeroBased" 

24#: the suggested x-coordinate 

25_SUGGESTED_X_IN_0_1: Final[str] = "suggestedXin01" 

26 

27 

28class Instance(QAPInstance): 

29 """ 

30 An instance of the One-Dimensional Ordering Problem. 

31 

32 Such an instance represents the ranking of objects by their distance to 

33 each other as a :mod:`~moptipyapps.qap` problem. 

34 

35 >>> def _dist(a, b): 

36 ... return abs(a - b) 

37 >>> def _tags(a): 

38 ... return f"t{a}" 

39 >>> the_instance = Instance.from_sequence_and_distance( 

40 ... [1, 2, 3], _dist, 1, 100, ("bla", ), _tags) 

41 >>> print(the_instance) 

42 order1d_3_1 

43 >>> print(the_instance.distances) 

44 [[0 1 2] 

45 [1 0 1] 

46 [2 1 0]] 

47 >>> print(the_instance.flows) 

48 [[0 4 2] 

49 [3 0 3] 

50 [2 4 0]] 

51 """ 

52 

53 def __init__(self, distances: np.ndarray, 

54 flow_power: int | float, 

55 horizon: int, 

56 tag_titles: Iterable[str], 

57 tags: Iterable[tuple[Iterable[str] | str, int]], 

58 name: str | None = None) -> None: 

59 """ 

60 Create an instance of the quadratic assignment problem. 

61 

62 :param distances: the original distance matrix 

63 :param flow_power: the power to be used for constructing the flows 

64 based on the original distances 

65 :param horizon: the horizon for distance ranks, after which all 

66 elements are ignored 

67 :param tag_titles: the tag titles 

68 :param tags: the assignment of rows to names 

69 :param name: the instance name 

70 """ 

71 if not isinstance(flow_power, int | float): 

72 raise type_error(flow_power, "flow_power", (int, float)) 

73 if not (isfinite(flow_power) and (0 < flow_power < 100)): 

74 raise ValueError( 

75 f"flow_power must be in (0, 100), but is {flow_power}.") 

76 

77 n: Final[int] = len(distances) # the number of elements 

78 horizon = check_int_range(horizon, "horizon", 1, 1_000_000_000_000) 

79 

80 # construct the distance matrix 

81 dist_matrix = np.zeros((n, n), DEFAULT_INT) 

82 for i in range(n): 

83 for j in range(i + 1, n): 

84 dist_matrix[i, j] = dist_matrix[j, i] = j - i 

85 

86 # construct the flow matrix 

87 flows: np.ndarray = rankdata( 

88 distances, axis=1, method="average") - 1.0 

89 

90 # get the flow rank multiplier 

91 multiplier: float = 1.0 

92 for i in range(n): 

93 for j in range(n): 

94 if i == j: 

95 continue 

96 f = flows[i, j] 

97 if int(f) < f <= horizon: # dow we need the multiplier? 

98 multiplier = 0.5 ** (-flow_power) 

99 break 

100 

101 flow_matrix = np.zeros((n, n), DEFAULT_INT) 

102 max_val: Final[int] = min(n - 1, horizon) 

103 for i in range(n): 

104 for j in range(n): 

105 if i == j: 

106 continue 

107 f = flows[i, j] 

108 if f > horizon: 

109 continue 

110 flow_matrix[i, j] = round( 

111 multiplier * ((max_val - f + 1) ** flow_power)) 

112 

113 # construct a name if necessary 

114 if name is None: 

115 name = f"order1d_{n}_{num_to_str_for_name(flow_power)}" 

116 if max_val < (n - 1): 

117 name = f"{name}_{max_val}" 

118 

119 super().__init__(dist_matrix, flow_matrix, name=name) 

120 

121 if self.n != n: 

122 raise ValueError("error when assigning number of elements!") 

123 

124 #: the flow power 

125 self.flow_power: Final[int | float] = flow_power 

126 #: the horizon 

127 self.horizon: Final[int] = max_val 

128 

129 if not isinstance(tags, Iterable): 

130 raise type_error(tags, "tags", Iterable) 

131 if not isinstance(tag_titles, Iterable): 

132 raise type_error(tag_titles, "tag_titles", Iterable) 

133 

134 #: the tag titles 

135 self.tag_titles: Final[tuple[str, ...]] = tuple(map( 

136 sanitize_name, tag_titles)) 

137 req_len: Final[int] = len(self.tag_titles) 

138 if req_len <= 0: 

139 raise ValueError("No tags specified.") 

140 

141 #: the tags, i.e., additional data for each original element 

142 self.tags: Final[tuple[tuple[tuple[str, ...], int], ...]] = ( 

143 tuple(((t, ) if isinstance(t, str) 

144 else tuple(t), k) for t, k in tags)) 

145 if _ZERO_BASED_INDEX in self.tags: 

146 raise ValueError(f"{_ZERO_BASED_INDEX} not permitted in tags.") 

147 if _SUGGESTED_X_IN_0_1 in self.tags: 

148 raise ValueError(f"{_SUGGESTED_X_IN_0_1} not permitted in tags.") 

149 

150 if len(self.tags) < n: 

151 raise ValueError(f"there must be at least {self.n} tags, but got " 

152 f"{len(self.tags)}, i.e., {self.tags}") 

153 for tag in self.tags: 

154 check_int_range(tag[1], "id", 0, n) 

155 if len(tag[0]) != req_len: 

156 raise ValueError( 

157 f"all tags must have the same length. " 

158 f"while the first tag ({self.tags[0]}) has " 

159 f"length {req_len}, {tag} has length {len(tag[0])}.") 

160 

161 @staticmethod 

162 def from_sequence_and_distance( 

163 data: Iterable[T | None], 

164 get_distance: Callable[[T, T], int | float], 

165 flow_power: int | float, 

166 horizon: int, 

167 tag_titles: Iterable[str], 

168 get_tags: Callable[[T], str | Iterable[str]], 

169 name: str | None = None) -> "Instance": 

170 """ 

171 Turn a sequence of objects into a One-Dimensional Ordering instance. 

172 

173 :param data: the data source, i.e., an iterable of data elements 

174 :param get_tags: the function for extracting tags from objects 

175 :param get_distance: the function for getting the distance between 

176 objects 

177 :param flow_power: the flow power 

178 :param horizon: the maximal considered rank 

179 :param tag_titles: the tag names 

180 :param name: the optional name 

181 :return: the ordering instance 

182 

183 >>> def _dist(a, b): 

184 ... return abs(a - b) 

185 >>> def _tags(a): 

186 ... return f"x{a}", f"b{a}" 

187 >>> res = Instance.from_sequence_and_distance( 

188 ... [1, 2, 5], _dist, 2, 100, ("a", "b"), _tags) 

189 >>> print(res) 

190 order1d_3_2 

191 >>> print(res.flows) 

192 [[0 4 1] 

193 [4 0 1] 

194 [1 4 0]] 

195 >>> print(res.distances) 

196 [[0 1 2] 

197 [1 0 1] 

198 [2 1 0]] 

199 >>> res = Instance.from_sequence_and_distance( 

200 ... [1, 2, 35, 4], _dist, 2, 100, ("a", "b"), _tags) 

201 >>> print(res) 

202 order1d_4_2 

203 >>> print(res.flows) 

204 [[0 9 1 4] 

205 [9 0 1 4] 

206 [1 4 0 9] 

207 [4 9 1 0]] 

208 >>> print(res.distances) 

209 [[0 1 2 3] 

210 [1 0 1 2] 

211 [2 1 0 1] 

212 [3 2 1 0]] 

213 >>> res = Instance.from_sequence_and_distance( 

214 ... [1, 2, 4, 4], _dist, 2, 100, ("a", "b"), _tags) 

215 >>> print(res) 

216 order1d_3_2 

217 >>> print(res.flows) 

218 [[0 4 1] 

219 [4 0 1] 

220 [1 4 0]] 

221 >>> print(res.distances) 

222 [[0 1 2] 

223 [1 0 1] 

224 [2 1 0]] 

225 >>> print(res.tags) 

226 ((('x1', 'b1'), 0), (('x2', 'b2'), 1), (('x4', 'b4'), 2), \ 

227(('x4', 'b4'), 2)) 

228 >>> def _dist2(a, b): 

229 ... return abs(abs(a) - abs(b)) + 1 

230 >>> res = Instance.from_sequence_and_distance( 

231 ... [1, 2, -4, 4], _dist2, 2, 100, ("a", "b"), _tags) 

232 >>> print(res) 

233 order1d_4_2 

234 >>> print(res.flows) 

235 [[ 0 36 9 9] 

236 [36 0 9 9] 

237 [ 4 16 0 36] 

238 [ 4 16 36 0]] 

239 >>> print(res.distances) 

240 [[0 1 2 3] 

241 [1 0 1 2] 

242 [2 1 0 1] 

243 [3 2 1 0]] 

244 >>> res = Instance.from_sequence_and_distance( 

245 ... [1, 2, -4, 4], _dist2, 3, 100, ("a", "b"), _tags) 

246 >>> print(res) 

247 order1d_4_3 

248 >>> print(res.flows) 

249 [[ 0 216 27 27] 

250 [216 0 27 27] 

251 [ 8 64 0 216] 

252 [ 8 64 216 0]] 

253 >>> print(res.distances) 

254 [[0 1 2 3] 

255 [1 0 1 2] 

256 [2 1 0 1] 

257 [3 2 1 0]] 

258 >>> print(res.tags) 

259 ((('x1', 'b1'), 0), (('x2', 'b2'), 1), (('x-4', 'b-4'), 2), \ 

260(('x4', 'b4'), 3)) 

261 >>> res = Instance.from_sequence_and_distance( 

262 ... [1, 2, -4, 4], _dist2, 3, 2, ("a", "b"), _tags) 

263 >>> print(res) 

264 order1d_4_3_2 

265 >>> print(res.flows) 

266 [[0 8 0 0] 

267 [8 0 0 0] 

268 [0 1 0 8] 

269 [0 1 8 0]] 

270 >>> print(res.distances) 

271 [[0 1 2 3] 

272 [1 0 1 2] 

273 [2 1 0 1] 

274 [3 2 1 0]] 

275 >>> print(res.tags) 

276 ((('x1', 'b1'), 0), (('x2', 'b2'), 1), (('x-4', 'b-4'), 2), \ 

277(('x4', 'b4'), 3)) 

278 >>> res = Instance.from_sequence_and_distance( 

279 ... [1, 2, -4, 4], _dist2, 2, 2, ("a", "b"), _tags) 

280 >>> print(res) 

281 order1d_4_2_2 

282 >>> print(res.flows) 

283 [[0 4 0 0] 

284 [4 0 0 0] 

285 [0 1 0 4] 

286 [0 1 4 0]] 

287 >>> print(res.distances) 

288 [[0 1 2 3] 

289 [1 0 1 2] 

290 [2 1 0 1] 

291 [3 2 1 0]] 

292 """ 

293 if not isinstance(data, Iterable): 

294 raise type_error(data, "data", Iterable) 

295 if not callable(get_tags): 

296 raise type_error(get_tags, "get_tags", call=True) 

297 if not callable(get_distance): 

298 raise type_error(get_distance, "get_distance", call=True) 

299 if not isinstance(tag_titles, Iterable): 

300 raise type_error(tag_titles, "tag_titles", Iterable) 

301 

302 # build a distance matrix and purge all zero-distance elements 

303 datal: list[T] = cast("list[T]", data) \ 

304 if isinstance(data, list) else list(data) 

305 mappings: list[tuple[T, int]] = [] 

306 distances: list[list[int | float]] = [] 

307 i: int = 0 

308 while i < len(datal): 

309 o1: T = datal[i] 

310 j: int = i + 1 

311 current_dists: list[int | float] = [d[i] for d in distances] 

312 current_dists.append(0) 

313 while j < len(datal): 

314 o2: T = datal[j] 

315 dist: int | float = get_distance(o1, o2) 

316 if not (isfinite(dist) and (0 <= dist <= 1e100)): 

317 raise ValueError( 

318 f"invalid distance {dist} for objects {i}, {j}") 

319 if dist <= 0: # distance == 0, must purge 

320 mappings.append((o2, i)) 

321 del datal[j] 

322 for ds in distances: 

323 del ds[j] 

324 continue 

325 current_dists.append(dist) 

326 j += 1 

327 mappings.append((o1, i)) 

328 distances.append(current_dists) 

329 i += 1 

330 

331 # we now got a full distance matrix, let's turn it into a rank matrix 

332 return Instance(np.array(distances), 

333 flow_power, horizon, tag_titles, 

334 ((get_tags(obj), idx) for obj, idx in mappings), 

335 name) 

336 

337 def log_parameters_to(self, logger: KeyValueLogSection) -> None: 

338 """ 

339 Log all parameters of this component as key-value pairs. 

340 

341 :param logger: the logger for the parameters 

342 """ 

343 super().log_parameters_to(logger) 

344 logger.key_value("flowPower", self.flow_power) 

345 logger.key_value("horizon", self.horizon) 

346 logger.key_value("nOrig", len(self.tags))