Coverage for moptipy / tests / mo_archive_pruner.py: 79%

131 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-24 08:49 +0000

1"""Functions for testing multi-objective archive pruners.""" 

2from typing import Callable, Final, Iterable 

3 

4import numpy as np 

5from numpy.random import Generator, default_rng 

6from pycommons.types import check_int_range, type_error 

7 

8from moptipy.api.mo_archive import ( 

9 MOArchivePruner, 

10 MORecord, 

11 check_mo_archive_pruner, 

12) 

13from moptipy.api.mo_problem import MOProblem 

14from moptipy.mock.mo_problem import MockMOProblem 

15from moptipy.tests.component import validate_component 

16from moptipy.utils.nputils import DEFAULT_NUMERICAL 

17 

18 

19def __run_single_test( 

20 pruner_factory: Callable[[MOProblem], MOArchivePruner], 

21 random: Generator, 

22 dim: int, 

23 dt: np.dtype) -> bool: 

24 """ 

25 Run a single test. 

26 

27 :param pruner_factory: the factory for creating pruners 

28 :param random: the random number generator 

29 :param dim: the dimension 

30 :param dt: the data type 

31 :returns: `True` if a test was run, `False` if we need to try again 

32 """ 

33 mop: Final[MockMOProblem] = MockMOProblem.for_dtype(dim, dt) 

34 if not isinstance(mop, MockMOProblem): 

35 raise type_error(mop, "new mock problem", MockMOProblem) 

36 

37 tag: Final[str] = f"bla{random.integers(100)}" 

38 pruner = pruner_factory(mop) 

39 if not isinstance(pruner, MOArchivePruner): 

40 raise type_error(pruner, "pruner", MOArchivePruner) 

41 check_mo_archive_pruner(pruner) 

42 validate_component(pruner) 

43 

44 alen = check_int_range(int(random.integers(2, 10)), "alen", 2, 9) 

45 amax = check_int_range(int(random.integers(1, alen + 1)), "amax", 1, alen) 

46 

47 dim_mode = [False] 

48 if dim > 1: 

49 dim_mode.append(True) 

50 

51 for use_collapse in dim_mode: 

52 archive_1: list[MORecord] = [] 

53 archive_2: list[MORecord] = [] 

54 archive_3: list[MORecord] = [] 

55 

56 collapse_dim: int = -1 

57 if use_collapse: 

58 collapse_dim = int(random.integers(dim)) 

59 

60 max_samples: int = 10000 

61 

62 for _ in range(alen): 

63 needed: bool = True 

64 rec: MORecord | None = None 

65 fs: np.ndarray | None = None 

66 while needed: # we make sure that all records are unique 

67 max_samples -= 1 

68 if max_samples <= 0: 

69 return False 

70 fs = np.empty(dim, dt) 

71 if not isinstance(fs, np.ndarray): 

72 raise type_error(fs, "fs", np.ndarray) 

73 if len(fs) != dim: 

74 raise ValueError(f"len(fs)={len(fs)}!=dim={dim}") 

75 mop.sample(fs) 

76 rec = MORecord(tag, fs) 

77 if not isinstance(rec, MORecord): 

78 raise type_error(rec, "rec", MORecord) 

79 needed = False 

80 for z in archive_1: 

81 fs2 = z.fs 

82 if np.array_equal(fs, fs2): 

83 needed = True 

84 break 

85 if (rec is None) or (fs is None): 

86 raise ValueError("huh?") 

87 archive_1.append(rec) 

88 rec2 = MORecord(tag, fs.copy()) 

89 if (rec.x != rec2.x) \ 

90 or (not np.array_equal(rec2.fs, rec.fs)): 

91 raise ValueError(f"{rec} != {rec2}") 

92 archive_2.append(rec2) 

93 rec2 = MORecord(tag, fs.copy()) 

94 archive_3.append(rec2) 

95 

96 # done creating archive and copy of archive 

97 

98 if not isinstance(archive_1, list): 

99 raise type_error(archive_1, "archive_1", list) 

100 if not isinstance(archive_2, list): 

101 raise type_error(archive_2, "archive_2", list) 

102 if not isinstance(archive_3, list): 

103 raise type_error(archive_3, "archive_3", list) 

104 

105 thelen: int = len(archive_1) 

106 if not isinstance(thelen, int): 

107 raise type_error(thelen, "len(archive)", int) 

108 if thelen != alen: 

109 raise ValueError( 

110 f"{alen} != len(archive_1)={len(archive_1)}?") 

111 

112 if use_collapse: 

113 collapse_value = archive_1[random.integers(alen)].fs[ 

114 collapse_dim] 

115 for rec in archive_1: 

116 rec.fs[collapse_dim] = collapse_value 

117 for rec in archive_2: 

118 rec.fs[collapse_dim] = collapse_value 

119 for rec in archive_3: 

120 rec.fs[collapse_dim] = collapse_value 

121 

122 # perform the pruning 

123 pruner.prune(archive_1, amax, len(archive_1)) 

124 pruner.prune(archive_3, amax, len(archive_3)) 

125 

126 thelen = len(archive_1) 

127 if not isinstance(thelen, int): 

128 raise type_error(thelen, "len(archive_1)", int) 

129 if thelen != alen: 

130 raise ValueError( 

131 f"pruning messed up archive len: {alen} != " 

132 f"len(archive)={len(archive_1)}?") 

133 

134 # make sure that no element was deleted or added 

135 for ii, a in enumerate(archive_1): 

136 if not isinstance(a, MORecord): 

137 raise type_error(a, f"archive[{ii}]", MORecord) 

138 for j in range(ii): 

139 if archive_1[j] is a: 

140 raise ValueError(f"record {a} appears at " 

141 f"indexes {ii} and {j}!") 

142 if a.x != tag: 

143 raise ValueError(f"a.x={a.x}!={tag!r}") 

144 if not isinstance(a.fs, np.ndarray): 

145 raise type_error(a.fs, "a.fs", np.ndarray) 

146 b = archive_3[ii] 

147 if (a.x != b.x) or (not np.array_equal(a.fs, b.fs)): 

148 raise ValueError( 

149 f"archive pruning not deterministic, archive_1[{ii}]={a}" 

150 f" but archive_2[{ii}]={b}.") 

151 if not use_collapse: 

152 for idx, b in enumerate(archive_2): 

153 if np.array_equal(a.fs, b.fs): 

154 if a.x != b.x: 

155 raise ValueError( 

156 f"a.fs={a.fs}==b.fs, but " 

157 f"a.x={a.x}!=b.x={b.x}") 

158 del archive_2[idx] 

159 break 

160 return True 

161 

162 

163def validate_mo_archive_pruner( 

164 pruner_factory: Callable[[MOProblem], MOArchivePruner], 

165 dimensions: Iterable[int], 

166 dtypes: Iterable[np.dtype] = DEFAULT_NUMERICAL) -> None: 

167 """ 

168 Check whether an object is a moptipy multi-objective optimization pruner. 

169 

170 This method checks whether the class is correct and whether the pruning 

171 follows the general contract: Interesting records in the list to be pruned 

172 are moved to the front, the ones to discard are moved to the back. No 

173 record is lost and none is duplicated. 

174 

175 :param pruner_factory: the creator for the multi-objective archive pruner 

176 to test 

177 :param dimensions: the dimensions to simulate 

178 :param dtypes: the dtypes to test on 

179 :raises ValueError: if `mo_problem` is not a valid 

180 :class:`~moptipy.api.mo_problem.MOProblem` 

181 :raises TypeError: if values of the wrong types are encountered 

182 """ 

183 if not callable(pruner_factory): 

184 raise type_error(pruner_factory, "pruner_factory", call=True) 

185 if not isinstance(dimensions, Iterable): 

186 raise type_error(dimensions, "dimensions", Iterable) 

187 if not isinstance(dtypes, Iterable): 

188 raise type_error(dtypes, "dtypes", Iterable) 

189 

190 nothing: bool = True 

191 random: Final[Generator] = default_rng() 

192 for dim in dimensions: 

193 if not isinstance(dim, int): 

194 raise type_error(dim, "dimensions[i]", int) 

195 nothing = False 

196 for dt in dtypes: 

197 if not isinstance(dt, np.dtype): 

198 raise type_error(dt, "dt", np.dtype) 

199 while not __run_single_test(pruner_factory, random, dim, dt): 

200 pass 

201 

202 if nothing: 

203 raise ValueError("dimensions are empty!")