Coverage for moptipyapps / prodsched / statistics.py: 93%

137 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-13 08:40 +0000

1""" 

2A statistics record for the simulation. 

3 

4This module provides a record with statistics derived from one single 

5MFC simulation. It can store values such as the mean fill rate or the 

6mean stock level. 

7Such statistics records are filled in by instances of the 

8:class:`~moptipyapps.prodsched.statistics_collector.StatisticsCollector` 

9plugged into the 

10:class:`~moptipyapps.prodsched.simulation.Simulation`. 

11""" 

12 

13from itertools import chain 

14from typing import Callable, Final, Generator, Iterable, Self 

15 

16from moptipy.utils.logger import KEY_VALUE_SEPARATOR 

17from pycommons.io.csv import CSV_SEPARATOR, SCOPE_SEPARATOR 

18from pycommons.math.stream_statistics import ( 

19 KEY_MAXIMUM, 

20 KEY_MEAN_ARITH, 

21 KEY_MINIMUM, 

22 KEY_STDDEV, 

23 StreamStatistics, 

24) 

25from pycommons.strings.string_conv import ( 

26 num_or_none_to_str, 

27 str_to_num_or_none, 

28) 

29from pycommons.types import check_int_range, type_error 

30 

31#: the name of the statistics key 

32COL_STAT: Final[str] = "stat" 

33#: the total column name 

34COL_TOTAL: Final[str] = "total" 

35#: the statistics rate 

36KEY_RATE: Final[str] = "rate" 

37#: the product column prefix 

38COL_PRODUCT_PREFIX: Final[str] = "product_" 

39#: the mean TRP row 

40ROW_TRP: Final[str] = "trp" 

41#: the fill rate row 

42ROW_FILL_RATE: Final[str] = f"fill{SCOPE_SEPARATOR}{KEY_RATE}" 

43#: the CWT row 

44ROW_CWT: Final[str] = "cwt" 

45#: the mean stock level row 

46ROW_STOCK_LEVEL_MEAN: Final[str] = \ 

47 f"stocklevel{SCOPE_SEPARATOR}{KEY_MEAN_ARITH}" 

48#: the fulfilled rate 

49ROW_FULFILLED_RATE: Final[str] = f"fulfilled{SCOPE_SEPARATOR}{KEY_RATE}" 

50#: the simulation time getter 

51ROW_SIMULATION_TIME: Final[str] = \ 

52 f"time{SCOPE_SEPARATOR}s{KEY_VALUE_SEPARATOR}" 

53 

54#: the statistics that we will print 

55_STATS: tuple[tuple[str, Callable[[ 

56 StreamStatistics], int | float | None]], ...] = ( 

57 (KEY_MINIMUM, StreamStatistics.getter_or_none(KEY_MINIMUM)), 

58 (KEY_MEAN_ARITH, StreamStatistics.getter_or_none(KEY_MEAN_ARITH)), 

59 (KEY_MAXIMUM, StreamStatistics.getter_or_none(KEY_MAXIMUM)), 

60 (KEY_STDDEV, StreamStatistics.getter_or_none(KEY_STDDEV))) 

61 

62 

63class Statistics: 

64 """ 

65 A statistics record based on production scheduling. 

66 

67 It provides the following statistics: 

68 

69 - :attr:`~immediate_rates`: The per-product-fillrate, i.e., the fraction 

70 of demands of a given product that were immediately fulfilled when 

71 arriving in the system (i.e., that were fulfilled by using product that 

72 was available in the warehouse/in stock). 

73 Higher values are good. 

74 - :attr:`~immediate_rate`: The overall fillrate, i.e., the total fraction 

75 of demands that were immediately fulfilled upon arrival in the system 

76 over all demands. That is, this is the fraction of demands that were 

77 fulfilled by using product that was available in the warehouse/in stock. 

78 Higher values are good. 

79 - :attr:`~waiting_times`: The per-product waiting times ("CWT") for the 

80 demands that came in but could *not* immediately be fulfilled. These are 

81 the demands for a given product that were, so to say, not covered by the 

82 fillrate/:attr:`~immediate_rate`. If all demands of a product could 

83 immediately be satisfied, then this is `None`. 

84 Otherwise, smaller values are good. 

85 - :attr:`~waiting_time`: The overall waiting times ("CWT") for the demands 

86 that came in but could *not* immediately be fulfilled. These are all the 

87 demands for a given product that were, so to say, not covered by the 

88 fillrate/:attr:`~immediate_rate`. If all demands could immediately be 

89 satisfied, then this is `None`. 

90 Otherwise, smaller values are good. 

91 - :attr:`~production_times`: The per-product times that producing one unit 

92 of the product takes from the moment that a production job is created 

93 until it is completed. Smaller values of this "TRP" are better. 

94 - :attr:`~production_time`: The overall statistics on the times that 

95 producing one unit of any product takes from the moment that a 

96 production job is created until it is completed. Smaller values this 

97 "TRP" are better. 

98 - :attr:`~fulfilled_rates`: The per-product fraction of demands that were 

99 satisfied. Demands for a product may remain unsatisfied if they have not 

100 been satisfied by the end of the simulation period. Larger values are 

101 better. 

102 - :attr:`~fulfilled_rate`: The fraction of demands that were satisfied. 

103 Demands may remain unsatisfied if they have not been satisfied by the 

104 end of the simulation period. Larger values are better. 

105 - :attr:`~stock_levels`: The average amount of a given product in the 

106 warehouse averaged over the simulation time. Smaller values are better. 

107 - :attr:`~stock_level`: The total average amount units of any product in 

108 the warehouse averaged over the simulation time. Smaller values are 

109 better. 

110 - :attr:`~simulation_time_nanos`: The total time that the simulation took, 

111 measured in nanoseconds. 

112 

113 Instances of this class are filled by 

114 :class:`~moptipyapps.prodsched.statistics_collector.StatisticsCollector` 

115 objects plugged into the 

116 :class:`~moptipyapps.prodsched.simulation.Simulation`. 

117 """ 

118 

119 def __init__(self, n_products: int) -> None: 

120 """ 

121 Create the statistics record for a given number of products. 

122 

123 :param n_products: the number of products 

124 """ 

125 check_int_range(n_products, "n_products", 1, 1_000_000_000) 

126 #: the production time (TRP) statistics per-product 

127 self.production_times: Final[list[ 

128 StreamStatistics | None]] = [None] * n_products 

129 #: the overall production time (TRP) statistics 

130 self.production_time: StreamStatistics | None = None 

131 #: the fraction of demands that were immediately satisfied, 

132 #: on a per-product basis, i.e., the fillrate 

133 self.immediate_rates: Final[list[int | float | None]] = ( 

134 [None] * n_products) 

135 #: the overall fraction of immediately satisfied demands, i.e., 

136 #: the fillrate 

137 self.immediate_rate: int | float | None = None 

138 #: the average waiting time for all demands that were not immediately 

139 #: satisfied -- only counting demands that were actually satisfied, 

140 #: i.e., the CWT 

141 self.waiting_times: Final[list[ 

142 StreamStatistics | None]] = [None] * n_products 

143 #: the overall waiting time for all demands that were not immediately 

144 #: satisfied -- only counting demands that were actually satisfied, 

145 #: i.e., the CWT 

146 self.waiting_time: StreamStatistics | None = None 

147 #: the fraction of demands that were fulfilled, on a per-product basis 

148 self.fulfilled_rates: Final[list[ 

149 int | float | None]] = [None] * n_products 

150 #: the fraction of demands that were fulfilled overall 

151 self.fulfilled_rate: int | float | None = None 

152 #: the average stock level, on a per-product basis 

153 self.stock_levels: Final[list[ 

154 int | float | None]] = [None] * n_products 

155 #: the overall average stock level 

156 self.stock_level: int | float | None = None 

157 #: the nanoseconds used by the simulation 

158 self.simulation_time_nanos: int | float | None = None 

159 

160 def __str__(self) -> str: 

161 """Convert this object to a string.""" 

162 return "\n".join(to_stream(self)) 

163 

164 def clear(self) -> None: 

165 """Clear all the data.""" 

166 n: Final[int] = list.__len__(self.production_times) 

167 if n <= 0: 

168 raise ValueError("Huh?") 

169 for i in range(n): 

170 self.production_times[i] = None 

171 self.immediate_rates[i] = None 

172 self.waiting_times[i] = None 

173 self.fulfilled_rates[i] = None 

174 self.stock_levels[i] = None 

175 

176 self.production_time = None 

177 self.immediate_rate = None 

178 self.waiting_time = None 

179 self.fulfilled_rate = None 

180 self.stock_level = None 

181 self.simulation_time_nanos = None 

182 

183 def copy_from(self, stat: "Statistics") -> None: 

184 """ 

185 Copy the contents of another statistics record. 

186 

187 :param stat: the other statistics record 

188 """ 

189 if not isinstance(stat, Statistics): 

190 raise type_error(stat, "stat", Statistics) 

191 self.production_times[:] = stat.production_times 

192 self.production_time = stat.production_time 

193 self.immediate_rates[:] = stat.immediate_rates 

194 self.immediate_rate = stat.immediate_rate 

195 self.waiting_times[:] = stat.waiting_times 

196 self.waiting_time = stat.waiting_time 

197 self.fulfilled_rates[:] = stat.fulfilled_rates 

198 self.fulfilled_rate = stat.fulfilled_rate 

199 self.stock_levels[:] = stat.stock_levels 

200 self.stock_level = stat.stock_level 

201 self.simulation_time_nanos = stat.simulation_time_nanos 

202 

203 def from_stream(self, stream: Iterable[str]) -> Self: 

204 """ 

205 Load the data from a stream. 

206 

207 Notice: The `n` values of the statistics records cannot be loaded. 

208 They will be lost and just set to some more or less random number. 

209 

210 :param stream: the stream of data 

211 :return: this object 

212 """ 

213 self.clear() 

214 

215 n: Final[int] = list.__len__(self.production_times) 

216 if n <= 0: 

217 raise ValueError("Huh?") 

218 

219 keys: Final[set[str]] = { 

220 f"{key}{SCOPE_SEPARATOR}{the_stat[0]}" 

221 for key in (ROW_TRP, ROW_CWT) for the_stat in _STATS} 

222 keys.update((ROW_FILL_RATE, ROW_FULFILLED_RATE, 

223 ROW_STOCK_LEVEL_MEAN)) 

224 sim_time_key: Final[str] = ROW_SIMULATION_TIME 

225 

226 data: dict[str, list[int | float | None]] = {} 

227 sim_time: int | None = None 

228 for srow in stream: 

229 row = str.strip(srow) 

230 if row.startswith(sim_time_key): 

231 sim_time = check_int_range( 

232 round(float(row[str.__len__( 

233 sim_time_key):]) * 1_000_000_000), 

234 sim_time_key, 0, 1_000_000_000_000_000_000_000_000) 

235 if set.__len__(keys) <= 0: 

236 break 

237 continue 

238 

239 cols: list[str] = str.split(srow, CSV_SEPARATOR) 

240 key: str = cols[0] 

241 if (list.__len__(cols) <= (n + 1)) or (key not in keys): 

242 continue 

243 if key in data: 

244 raise ValueError(f"Duplicate key '{key}'.") 

245 data[key] = [str_to_num_or_none(cols[i]) for i in range(1, n + 2)] 

246 keys.remove(key) 

247 if (set.__len__(keys) <= 0) and (sim_time is not None): 

248 break 

249 

250 if set.__len__(keys) > 0: 

251 raise ValueError(f"Missing keys: {keys}") 

252 if sim_time is None: 

253 raise ValueError(f"Did not find key '{sim_time_key}'.") 

254 

255 self.simulation_time_nanos = sim_time 

256 self.production_time = _split_data_stat( 

257 data, ROW_TRP, self.production_times) 

258 self.waiting_time = _split_data_stat( 

259 data, ROW_CWT, self.waiting_times) 

260 

261 vals: list[int | float | None] = data[ROW_FILL_RATE] 

262 self.immediate_rate = vals[0] 

263 self.immediate_rates[:] = vals[1:] 

264 

265 vals = data[ROW_STOCK_LEVEL_MEAN] 

266 self.stock_level = vals[0] 

267 self.stock_levels[:] = vals[1:] 

268 

269 vals = data[ROW_FULFILLED_RATE] 

270 self.fulfilled_rate = vals[0] 

271 self.fulfilled_rates[:] = vals[1:] 

272 

273 return self 

274 

275 

276def _split_data_stat(data: dict[str, list[int | float | None]], 

277 key: str, 

278 dest: list[StreamStatistics | None]) \ 

279 -> StreamStatistics | None: 

280 """ 

281 Split a data set. 

282 

283 :param data: the data set 

284 :param dest: the destination list 

285 :return: the main statistics, if any 

286 """ 

287 key_min: Final[str] = f"{key}{SCOPE_SEPARATOR}{KEY_MINIMUM}" 

288 key_mean: Final[str] = f"{key}{SCOPE_SEPARATOR}{KEY_MEAN_ARITH}" 

289 key_max: Final[str] = f"{key}{SCOPE_SEPARATOR}{KEY_MAXIMUM}" 

290 key_sd: Final[str] = f"{key}{SCOPE_SEPARATOR}{KEY_STDDEV}" 

291 for i in range(1, list.__len__(dest) + 1): 

292 dest[i - 1] = __stream_stats(data, key_min, key_mean, key_max, 

293 key_sd, i) 

294 return __stream_stats(data, key_min, key_mean, key_max, key_sd, 0) 

295 

296 

297def __stream_stats(data: dict[str, list[int | float | None]], 

298 key_min: str, key_mean: str, key_max: str, key_sd: str, 

299 i: int) -> StreamStatistics | None: 

300 """ 

301 Get a stream statistics. 

302 

303 :param data: the data array 

304 :param key_min: the minimum key 

305 :param key_mean: the mean key 

306 :param key_max: the maximum key 

307 :param key_sd: the standard deviation key 

308 :param i: the index 

309 :return: the statistics or `None` 

310 """ 

311 the_min = data[key_min][i] 

312 the_mean = data[key_mean][i] 

313 the_max = data[key_max][i] 

314 the_sd = data[key_sd][i] 

315 if (the_min is None) or (the_max is None): 

316 return None 

317 if the_mean is None: 

318 raise ValueError( 

319 f"Invalid mean {the_mean} for min={the_min}, max={the_max}!") 

320 return StreamStatistics(n=1 if the_sd is None else 100, minimum=the_min, 

321 mean_arith=the_mean, 

322 maximum=the_max, stddev=the_sd) 

323 

324 

325def to_stream(stats: Statistics) -> Generator[str, None, None]: 

326 """ 

327 Write a statistics record to a stream. 

328 

329 :param stats: the statistics record 

330 :return: the stream of data 

331 """ 

332 n_products: Final[int] = list.__len__(stats.production_times) 

333 nts: Final[Callable[[int | float | None], str]] = num_or_none_to_str 

334 

335 yield str.join(CSV_SEPARATOR, chain(( 

336 COL_STAT, COL_TOTAL), (f"{COL_PRODUCT_PREFIX}{i}" for i in range( 

337 n_products)))) 

338 

339 for key, alle, single in ( 

340 (ROW_TRP, stats.production_times, stats.production_time), 

341 (ROW_CWT, stats.waiting_times, stats.waiting_time)): 

342 for stat, call in _STATS: 

343 yield str.join(CSV_SEPARATOR, chain(( 

344 f"{key}{SCOPE_SEPARATOR}{stat}", nts(call(single))), ( 

345 map(nts, map(call, alle))))) 

346 yield str.join(CSV_SEPARATOR, chain(( 

347 ROW_FILL_RATE, nts(stats.immediate_rate)), ( 

348 map(nts, stats.immediate_rates)))) 

349 yield str.join(CSV_SEPARATOR, chain(( 

350 ROW_STOCK_LEVEL_MEAN, nts(stats.stock_level)), ( 

351 map(nts, stats.stock_levels)))) 

352 yield str.join(CSV_SEPARATOR, chain(( 

353 ROW_FULFILLED_RATE, nts(stats.fulfilled_rate)), ( 

354 map(nts, stats.fulfilled_rates)))) 

355 yield f"{ROW_SIMULATION_TIME}{nts( 

356 stats.simulation_time_nanos / 1_000_000_000)}"