Coverage for moptipyapps / prodsched / statistics_collector.py: 96%

113 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2025-12-30 03:25 +0000

1r""" 

2A tool for collecting statistics from an MFC simulation. 

3 

4A statistics collector :class:`~StatisticsCollector` is a special 

5:class:`~moptipyapps.prodsched.simulation.Listener` 

6that can be plugged into a 

7:class:`~moptipyapps.prodsched.simulation.Simulation`. 

8During the execution of the simulation, it gathers statistics about what is 

9going on. It finally stores these into a 

10:class:`~moptipyapps.prodsched.statistics.Statistics` record. 

11Such a record can then be used to understand the key characteristics of the 

12behavior of the simulation in on a given 

13:class:`~moptipyapps.prodsched.instance.Instance`. 

14 

15The simulation listeners (:class:`~moptipyapps.prodsched.simulation.Listener`) 

16offer a method to pipe out data from arbitrary subclasses of 

17:class:`~moptipyapps.prodsched.simulation.Simulation`. 

18This means that they allow us to access data in a unified way, regardless of 

19which manufacturing logic or production scheduling we actually implement. 

20 

21In the case of the :class:`~StatisticsCollector` implemented here, we 

22implement the :class:`~moptipyapps.prodsched.simulation.Listener`-API to 

23fill a :class:`~moptipyapps.prodsched.statistics.Statistics` record with data. 

24Such records offer the standard statistics that Thürer et al. used in their 

25works. 

26In other words, we can make such statistics available and accessible, 

27regardless of how our simulation schedules the production. 

28 

29Moreover, multiple such :class:`~moptipyapps.prodsched.statistics.Statistics` 

30records, filled with data from simulations over multiple different instances 

31(:mod:`~moptipyapps.prodsched.instance`) can be combined in a 

32:class:`~moptipyapps.prodsched.multistatistics.MultiStatistics` record. 

33This record, which will comprehensively represent performance over several 

34independent instances, then can be used as basis for objective functions 

35(:class:`~moptipy.api.objective.Objective`) such as those given in 

36:mod:`~moptipyapps.prodsched.objectives`. 

37 

38>>> instance = Instance( 

39... name="test2", n_products=2, n_customers=1, n_stations=2, n_demands=5, 

40... time_end_warmup=21, time_end_measure=10000, 

41... routes=[[0, 1], [1, 0]], 

42... demands=[[0, 0, 1, 10, 20, 90], [1, 0, 0, 5, 22, 200], 

43... [2, 0, 1, 7, 30, 200], [3, 0, 1, 6, 60, 200], 

44... [4, 0, 0, 125, 5, 2000]], 

45... warehous_at_t0=[2, 1], 

46... station_product_unit_times=[[[10.0, 50.0, 15.0, 100.0], 

47... [ 5.0, 20.0, 7.0, 35.0, 4.0, 50.0]], 

48... [[ 5.0, 24.0, 7.0, 80.0], 

49... [ 3.0, 21.0, 6.0, 50.0,]]]) 

50 

51 

52>>> from moptipyapps.prodsched.simulation import Simulation 

53>>> statistics = Statistics(instance.n_products) 

54>>> collector = StatisticsCollector(instance) 

55>>> collector.set_dest(statistics) 

56>>> simulation = Simulation(instance, collector) 

57>>> simulation.ctrl_run() 

58>>> print("\n".join(str(statistics).split("\n")[:-1])) 

59stat;total;product_0;product_1 

60trp.min;229.57142857142858;455.6;229.57142857142858 

61trp.mean;304.8888888888889;455.6;246.92307692307693 

62trp.max;455.6;455.6;267.1666666666667 

63trp.sd;97.56326157594913;0;19.50721118346466 

64cwt.min;1603;2278;1603 

65cwt.mean;1829.3333333333333;2278;1605 

66cwt.max;2278;2278;1607 

67cwt.sd;388.56187838403986;;2.8284271247461903 

68fill.rate;0;0;0 

69stocklevel.mean;0.6078765407355446;0.4497444633730835;0.15813207736246118 

70fulfilled.rate;1;1;1 

71""" 

72 

73from time import time_ns 

74from typing import Final 

75 

76from pycommons.math.stream_statistics import ( 

77 StreamStatistics, 

78 StreamStatisticsAggregate, 

79) 

80from pycommons.math.streams import StreamSum 

81from pycommons.types import type_error 

82 

83from moptipyapps.prodsched.instance import ( 

84 Demand, 

85 Instance, 

86) 

87from moptipyapps.prodsched.simulation import Job, Listener 

88from moptipyapps.prodsched.statistics import Statistics 

89 

90 

91class StatisticsCollector(Listener): 

92 """A listener for simulation events that collects basic statistics.""" 

93 

94 def __init__(self, instance: Instance) -> None: 

95 """ 

96 Initialize the listener. 

97 

98 :param instance: the instance 

99 """ 

100 if not isinstance(instance, Instance): 

101 raise type_error(instance, "instance", Instance) 

102 #: the destination to write to. 

103 self.__dest: Statistics | None = None 

104 #: the end of the warmup period 

105 self.__warmup: Final[float] = instance.time_end_warmup 

106 #: the total time window length 

107 self.__total: Final[float] = instance.time_end_measure 

108 #: the number of products 

109 self.__n_products: Final[int] = instance.n_products 

110 #: the measurable demands per product 

111 self.__n_mdpb: Final[tuple[int, ...]] = ( 

112 instance.n_measurable_demands_per_product) 

113 

114 n_products: Final[int] = instance.n_products 

115 #: the internal per-product production time records 

116 self.__production_times: Final[tuple[ 

117 StreamStatisticsAggregate[StreamStatistics], ...]] = tuple( 

118 StreamStatistics.aggregate() for _ in range(n_products)) 

119 #: the internal total production time 

120 self.__production_time: Final[StreamStatisticsAggregate[ 

121 StreamStatistics]] = StreamStatistics.aggregate() 

122 #: the immediately satisfied products: satisfied vs. total 

123 self.__immediately_satisfied: Final[list[int]] = [0] * n_products 

124 #: the statistics of the waiting time for not-immediately satisfied 

125 #: products, per product 

126 self.__waiting_times: Final[tuple[ 

127 StreamStatisticsAggregate[StreamStatistics], ...]] = tuple( 

128 StreamStatistics.aggregate() for _ in range(n_products)) 

129 #: the total waiting time for non-immediately satisfied products 

130 self.__waiting_time: Final[StreamStatisticsAggregate[ 

131 StreamStatistics]] = StreamStatistics.aggregate() 

132 #: the number of fulfilled jobs, per-product 

133 self.__fulfilled: Final[list[int]] = [0] * n_products 

134 #: the stock levels on a per-product basis 

135 self.__stock_levels: Final[tuple[StreamSum, ...]] = tuple( 

136 StreamSum() for _ in range(n_products)) 

137 #: the total stock level sum 

138 self.__stock_level: Final[StreamSum] = StreamSum() 

139 #: the number of products currently in warehouse and since when 

140 #: they were there 

141 self.__in_warehouse: Final[tuple[list[float], ...]] = tuple([ 

142 0.0, 0.0] for _ in range(n_products)) 

143 #: the stat time of the simulation 

144 self.__start: int = -1 

145 

146 def set_dest(self, dest: Statistics) -> None: 

147 """ 

148 Set the statistics record to fill. 

149 

150 :param dest: the destination 

151 """ 

152 if not isinstance(dest, Statistics): 

153 raise type_error(dest, "dest", Statistics) 

154 self.__dest = dest 

155 

156 def start(self) -> None: 

157 """Clear all the data of the collector.""" 

158 if self.__dest is None: 

159 raise ValueError("Need destination statistics!") 

160 self.__start = time_ns() 

161 n_products: Final[int] = self.__n_products 

162 for i in range(n_products): 

163 self.__production_times[i].reset() 

164 self.__immediately_satisfied[i] = 0 

165 self.__waiting_times[i].reset() 

166 self.__fulfilled[i] = 0 

167 self.__stock_levels[i].reset() 

168 wh = self.__in_warehouse[i] 

169 wh[0] = 0.0 

170 wh[1] = 0.0 

171 self.__production_time.reset() 

172 self.__waiting_time.reset() 

173 self.__stock_level.reset() 

174 

175 def product_in_warehouse( 

176 self, time: float, product_id: int, amount: int, 

177 is_in_measure_period: bool) -> None: 

178 """ 

179 Report a change of the amount of products in the warehouse. 

180 

181 :param time: the current time 

182 :param product_id: the product ID 

183 :param amount: the new absolute total amount of that product in the 

184 warehouse 

185 :param is_in_measure_period: is this event inside the measurement 

186 period? 

187 """ 

188 iwh: Final[list[float]] = self.__in_warehouse[product_id] 

189 if is_in_measure_period: 

190 value: float = (time - max(iwh[1], self.__warmup)) * iwh[0] 

191 self.__stock_levels[product_id].add(value) 

192 self.__stock_level.add(value) 

193 iwh[0] = amount 

194 iwh[1] = time 

195 

196 def produce_at_end( 

197 self, time: float, station_id: int, job: Job) -> None: 

198 """ 

199 Report the completion of the production of a product at a station. 

200 

201 :param time: the current time 

202 :param station_id: the station ID 

203 :param job: the production job 

204 """ 

205 if job.measure and job.completed: 

206 am: Final[int] = job.amount 

207 tt: float = time - job.arrival 

208 if am <= 1: 

209 self.__production_times[job.product_id].add(tt) 

210 self.__production_time.add(tt) 

211 else: # deal with amounts > 1 

212 tt /= am 

213 for _ in range(am): 

214 self.__production_times[job.product_id].add(tt) 

215 self.__production_time.add(tt) 

216 

217 def demand_satisfied( 

218 self, time: float, demand: Demand) -> None: 

219 """ 

220 Report that a given demand has been satisfied. 

221 

222 :param time: the time index when the demand was satisfied 

223 :param demand: the demand that was satisfied 

224 """ 

225 if demand.measure: 

226 at: Final[float] = demand.arrival 

227 pid: Final[int] = demand.product_id 

228 if at >= time: 

229 self.__immediately_satisfied[pid] += 1 

230 else: 

231 tt: Final[float] = time - at 

232 self.__waiting_times[pid].add(tt) 

233 self.__waiting_time.add(tt) 

234 self.__fulfilled[pid] += 1 

235 

236 def finished(self, time: float) -> None: 

237 """ 

238 Fill the collected statistics into the statistics record. 

239 

240 :param time: the time when we are finished 

241 """ 

242 dest: Final[Statistics | None] = self.__dest 

243 if dest is None: 

244 raise ValueError("Lost destination statistics record?") 

245 total: Final[tuple[int, ...]] = self.__n_mdpb 

246 

247 for i, stat in enumerate(self.__production_times): 

248 dest.production_times[i] = stat.result_or_none() 

249 dest.production_time = self.__production_time.result_or_none() 

250 

251 sn: int = 0 

252 sf: int = 0 

253 st: int = 0 

254 for i, n in enumerate(self.__immediately_satisfied): 

255 t = total[i] 

256 dest.immediate_rates[i] = n / t 

257 sn += n 

258 f = self.__fulfilled[i] 

259 dest.fulfilled_rates[i] = f / t 

260 sf += f 

261 st += t 

262 dest.immediate_rate = sn / st 

263 dest.fulfilled_rate = sf / st 

264 

265 for i, stat in enumerate(self.__waiting_times): 

266 dest.waiting_times[i] = stat.result_or_none() 

267 dest.waiting_time = self.__waiting_time.result_or_none() 

268 

269 slm: Final[StreamSum] = self.__stock_level 

270 twl: Final[float] = self.__total - self.__warmup 

271 wu: Final[float] = self.__warmup 

272 for i, sm in enumerate(self.__stock_levels): 

273 wh = self.__in_warehouse[i] 

274 v: float = (self.__total - max(wh[1], wu)) * wh[0] 

275 sm.add(v) 

276 slm.add(v) 

277 dest.stock_levels[i] = sm.result() / twl 

278 dest.stock_level = slm.result() / twl 

279 dest.simulation_time_nanos = time_ns() - self.__start