Coverage for moptipyapps / prodsched / statistics_collector.py: 96%

113 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-13 08:40 +0000

1r""" 

2A tool for collecting statistics from an MFC simulation. 

3 

4A statistics collector :class:`~StatisticsCollector` is a special 

5:class:`~moptipyapps.prodsched.simulation.Listener` 

6that can be plugged into a 

7:class:`~moptipyapps.prodsched.simulation.Simulation`. 

8During the execution of the simulation, it gathers statistics about what is 

9going on. It finally stores these into a 

10:class:`~moptipyapps.prodsched.statistics.Statistics` record. 

11Such a record can then be used to understand the key characteristics of the 

12behavior of the simulation in on a given 

13:class:`~moptipyapps.prodsched.instance.Instance`. 

14 

15The simulation listeners (:class:`~moptipyapps.prodsched.simulation.Listener`) 

16offer a method to pipe out data from arbitrary subclasses of 

17:class:`~moptipyapps.prodsched.simulation.Simulation`. 

18This means that they allow us to access data in a unified way, regardless of 

19which manufacturing logic or production scheduling we actually implement. 

20 

21In the case of the :class:`~StatisticsCollector` implemented here, we 

22implement the :class:`~moptipyapps.prodsched.simulation.Listener`-API to 

23fill a :class:`~moptipyapps.prodsched.statistics.Statistics` record with data. 

24Such records offer the standard statistics that Thürer et al. used in their 

25works. 

26In other words, we can make such statistics available and accessible, 

27regardless of how our simulation schedules the production. 

28 

29Moreover, multiple such :class:`~moptipyapps.prodsched.statistics.Statistics` 

30records, filled with data from simulations over multiple different instances 

31(:mod:`~moptipyapps.prodsched.instance`) can be combined in a 

32:class:`~moptipyapps.prodsched.multistatistics.MultiStatistics` record. 

33This record, which will comprehensively represent performance over several 

34independent instances, then can be used as basis for objective functions 

35(:class:`~moptipy.api.objective.Objective`) such as those given in 

36:mod:`~moptipyapps.prodsched.objectives`. 

37 

38>>> instance = Instance( 

39... name="test2", n_products=2, n_customers=1, n_stations=2, n_demands=5, 

40... time_end_warmup=21, time_end_measure=10000, 

41... routes=[[0, 1], [1, 0]], 

42... demands=[[0, 0, 1, 10, 20, 90], [1, 0, 0, 5, 22, 200], 

43... [2, 0, 1, 7, 30, 200], [3, 0, 1, 6, 60, 200], 

44... [4, 0, 0, 125, 5, 2000]], 

45... warehous_at_t0=[2, 1], 

46... station_product_unit_times=[[[10.0, 50.0, 15.0, 100.0], 

47... [ 5.0, 20.0, 7.0, 35.0, 4.0, 50.0]], 

48... [[ 5.0, 24.0, 7.0, 80.0], 

49... [ 3.0, 21.0, 6.0, 50.0,]]]) 

50 

51 

52>>> from moptipyapps.prodsched.simulation import Simulation 

53>>> statistics = Statistics(instance.n_products) 

54>>> collector = StatisticsCollector(instance) 

55>>> collector.set_dest(statistics) 

56>>> simulation = Simulation(instance, collector) 

57>>> simulation.ctrl_run() 

58>>> from moptipyapps.prodsched.statistics import to_stream 

59>>> text = list(to_stream(statistics)) 

60>>> print("\n".join(text[:-1])) 

61stat;total;product_0;product_1 

62trp.min;229.57142857142858;455.6;229.57142857142858 

63trp.mean;304.8888888888889;455.6;246.92307692307693 

64trp.max;455.6;455.6;267.1666666666667 

65trp.sd;97.56326157594913;0;19.50721118346466 

66cwt.min;1603;2278;1603 

67cwt.mean;1829.3333333333333;2278;1605 

68cwt.max;2278;2278;1607 

69cwt.sd;388.56187838403986;;2.8284271247461903 

70fill.rate;0;0;0 

71stocklevel.mean;0.6078765407355446;0.4497444633730835;0.15813207736246118 

72fulfilled.rate;1;1;1 

73 

74>>> statistics_2 = Statistics(instance.n_products) 

75>>> statistics_2 = statistics_2.from_stream(text) 

76>>> print("\n".join(list(to_stream(statistics_2))[:-1])) 

77stat;total;product_0;product_1 

78trp.min;229.57142857142858;455.6;229.57142857142858 

79trp.mean;304.8888888888889;455.6;246.92307692307693 

80trp.max;455.6;455.6;267.1666666666667 

81trp.sd;97.56326157594913;0;19.50721118346466 

82cwt.min;1603;2278;1603 

83cwt.mean;1829.3333333333333;2278;1605 

84cwt.max;2278;2278;1607 

85cwt.sd;388.56187838403986;;2.8284271247461903 

86fill.rate;0;0;0 

87stocklevel.mean;0.6078765407355446;0.4497444633730835;0.15813207736246118 

88fulfilled.rate;1;1;1 

89""" 

90 

91from time import time_ns 

92from typing import Final 

93 

94from pycommons.math.stream_statistics import ( 

95 StreamStatistics, 

96 StreamStatisticsAggregate, 

97) 

98from pycommons.math.streams import StreamSum 

99from pycommons.types import type_error 

100 

101from moptipyapps.prodsched.instance import ( 

102 Demand, 

103 Instance, 

104) 

105from moptipyapps.prodsched.simulation import Job, Listener 

106from moptipyapps.prodsched.statistics import Statistics 

107 

108 

109class StatisticsCollector(Listener): 

110 """A listener for simulation events that collects basic statistics.""" 

111 

112 def __init__(self, instance: Instance) -> None: 

113 """ 

114 Initialize the listener. 

115 

116 :param instance: the instance 

117 """ 

118 if not isinstance(instance, Instance): 

119 raise type_error(instance, "instance", Instance) 

120 #: the destination to write to. 

121 self.__dest: Statistics | None = None 

122 #: the end of the warmup period 

123 self.__warmup: Final[float] = instance.time_end_warmup 

124 #: the total time window length 

125 self.__total: Final[float] = instance.time_end_measure 

126 #: the number of products 

127 self.__n_products: Final[int] = instance.n_products 

128 #: the measurable demands per product 

129 self.__n_mdpb: Final[tuple[int, ...]] = ( 

130 instance.n_measurable_demands_per_product) 

131 

132 n_products: Final[int] = instance.n_products 

133 #: the internal per-product production time records 

134 self.__production_times: Final[tuple[ 

135 StreamStatisticsAggregate[StreamStatistics], ...]] = tuple( 

136 StreamStatistics.aggregate() for _ in range(n_products)) 

137 #: the internal total production time 

138 self.__production_time: Final[StreamStatisticsAggregate[ 

139 StreamStatistics]] = StreamStatistics.aggregate() 

140 #: the immediately satisfied products: satisfied vs. total 

141 self.__immediately_satisfied: Final[list[int]] = [0] * n_products 

142 #: the statistics of the waiting time for not-immediately satisfied 

143 #: products, per product 

144 self.__waiting_times: Final[tuple[ 

145 StreamStatisticsAggregate[StreamStatistics], ...]] = tuple( 

146 StreamStatistics.aggregate() for _ in range(n_products)) 

147 #: the total waiting time for non-immediately satisfied products 

148 self.__waiting_time: Final[StreamStatisticsAggregate[ 

149 StreamStatistics]] = StreamStatistics.aggregate() 

150 #: the number of fulfilled jobs, per-product 

151 self.__fulfilled: Final[list[int]] = [0] * n_products 

152 #: the stock levels on a per-product basis 

153 self.__stock_levels: Final[tuple[StreamSum, ...]] = tuple( 

154 StreamSum() for _ in range(n_products)) 

155 #: the total stock level sum 

156 self.__stock_level: Final[StreamSum] = StreamSum() 

157 #: the number of products currently in warehouse and since when 

158 #: they were there 

159 self.__in_warehouse: Final[tuple[list[float], ...]] = tuple([ 

160 0.0, 0.0] for _ in range(n_products)) 

161 #: the stat time of the simulation 

162 self.__start: int = -1 

163 

164 def set_dest(self, dest: Statistics) -> None: 

165 """ 

166 Set the statistics record to fill. 

167 

168 :param dest: the destination 

169 """ 

170 if not isinstance(dest, Statistics): 

171 raise type_error(dest, "dest", Statistics) 

172 self.__dest = dest 

173 

174 def start(self) -> None: 

175 """Clear all the data of the collector.""" 

176 if self.__dest is None: 

177 raise ValueError("Need destination statistics!") 

178 self.__start = time_ns() 

179 n_products: Final[int] = self.__n_products 

180 for i in range(n_products): 

181 self.__production_times[i].reset() 

182 self.__immediately_satisfied[i] = 0 

183 self.__waiting_times[i].reset() 

184 self.__fulfilled[i] = 0 

185 self.__stock_levels[i].reset() 

186 wh = self.__in_warehouse[i] 

187 wh[0] = 0.0 

188 wh[1] = 0.0 

189 self.__production_time.reset() 

190 self.__waiting_time.reset() 

191 self.__stock_level.reset() 

192 

193 def product_in_warehouse( 

194 self, time: float, product_id: int, amount: int, 

195 is_in_measure_period: bool) -> None: 

196 """ 

197 Report a change of the amount of products in the warehouse. 

198 

199 :param time: the current time 

200 :param product_id: the product ID 

201 :param amount: the new absolute total amount of that product in the 

202 warehouse 

203 :param is_in_measure_period: is this event inside the measurement 

204 period? 

205 """ 

206 iwh: Final[list[float]] = self.__in_warehouse[product_id] 

207 if is_in_measure_period: 

208 value: float = (time - max(iwh[1], self.__warmup)) * iwh[0] 

209 self.__stock_levels[product_id].add(value) 

210 self.__stock_level.add(value) 

211 iwh[0] = amount 

212 iwh[1] = time 

213 

214 def produce_at_end( 

215 self, time: float, station_id: int, job: Job) -> None: 

216 """ 

217 Report the completion of the production of a product at a station. 

218 

219 :param time: the current time 

220 :param station_id: the station ID 

221 :param job: the production job 

222 """ 

223 if job.measure and job.completed: 

224 am: Final[int] = job.amount 

225 tt: float = time - job.arrival 

226 if am <= 1: 

227 self.__production_times[job.product_id].add(tt) 

228 self.__production_time.add(tt) 

229 else: # deal with amounts > 1 

230 tt /= am 

231 for _ in range(am): 

232 self.__production_times[job.product_id].add(tt) 

233 self.__production_time.add(tt) 

234 

235 def demand_satisfied( 

236 self, time: float, demand: Demand) -> None: 

237 """ 

238 Report that a given demand has been satisfied. 

239 

240 :param time: the time index when the demand was satisfied 

241 :param demand: the demand that was satisfied 

242 """ 

243 if demand.measure: 

244 at: Final[float] = demand.arrival 

245 pid: Final[int] = demand.product_id 

246 if at >= time: 

247 self.__immediately_satisfied[pid] += 1 

248 else: 

249 tt: Final[float] = time - at 

250 self.__waiting_times[pid].add(tt) 

251 self.__waiting_time.add(tt) 

252 self.__fulfilled[pid] += 1 

253 

254 def finished(self, time: float) -> None: 

255 """ 

256 Fill the collected statistics into the statistics record. 

257 

258 :param time: the time when we are finished 

259 """ 

260 dest: Final[Statistics | None] = self.__dest 

261 if dest is None: 

262 raise ValueError("Lost destination statistics record?") 

263 total: Final[tuple[int, ...]] = self.__n_mdpb 

264 

265 for i, stat in enumerate(self.__production_times): 

266 dest.production_times[i] = stat.result_or_none() 

267 dest.production_time = self.__production_time.result_or_none() 

268 

269 sn: int = 0 

270 sf: int = 0 

271 st: int = 0 

272 for i, n in enumerate(self.__immediately_satisfied): 

273 t = total[i] 

274 dest.immediate_rates[i] = n / t 

275 sn += n 

276 f = self.__fulfilled[i] 

277 dest.fulfilled_rates[i] = f / t 

278 sf += f 

279 st += t 

280 dest.immediate_rate = sn / st 

281 dest.fulfilled_rate = sf / st 

282 

283 for i, stat in enumerate(self.__waiting_times): 

284 dest.waiting_times[i] = stat.result_or_none() 

285 dest.waiting_time = self.__waiting_time.result_or_none() 

286 

287 slm: Final[StreamSum] = self.__stock_level 

288 twl: Final[float] = self.__total - self.__warmup 

289 wu: Final[float] = self.__warmup 

290 for i, sm in enumerate(self.__stock_levels): 

291 wh = self.__in_warehouse[i] 

292 v: float = (self.__total - max(wh[1], wu)) * wh[0] 

293 sm.add(v) 

294 slm.add(v) 

295 dest.stock_levels[i] = sm.result() / twl 

296 dest.stock_level = slm.result() / twl 

297 dest.simulation_time_nanos = time_ns() - self.__start