Coverage for moptipyapps / prodsched / statistics_collector.py: 94%

113 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-11 04:40 +0000

1r""" 

2A tool for collecting statistics from an MFC simulation. 

3 

4A statistics collector is a special 

5:class:`~moptipyapps.prodsched.simulation.Listener` 

6that can be plugged into a 

7:class:`~moptipyapps.prodsched.simulation.Simulation`. 

8During the execution of the simulation, it gathers statistics about what is 

9going on. It finally stores these into a 

10:class:`~moptipyapps.prodsched.statistics.Statistics` record. 

11Such a record can then be used to understand the key characteristics of the 

12behavior of the simulation in on a given 

13:class:`~moptipyapps.prodsched.instance.Instance`. 

14 

15>>> instance = Instance( 

16... name="test2", n_products=2, n_customers=1, n_stations=2, n_demands=5, 

17... time_end_warmup=21, time_end_measure=10000, 

18... routes=[[0, 1], [1, 0]], 

19... demands=[[0, 0, 1, 10, 20, 90], [1, 0, 0, 5, 22, 200], 

20... [2, 0, 1, 7, 30, 200], [3, 0, 1, 6, 60, 200], 

21... [4, 0, 0, 125, 5, 2000]], 

22... warehous_at_t0=[2, 1], 

23... station_product_unit_times=[[[10.0, 50.0, 15.0, 100.0], 

24... [ 5.0, 20.0, 7.0, 35.0, 4.0, 50.0]], 

25... [[ 5.0, 24.0, 7.0, 80.0], 

26... [ 3.0, 21.0, 6.0, 50.0,]]]) 

27 

28 

29>>> from moptipyapps.prodsched.simulation import Simulation 

30>>> statistics = Statistics(instance.n_products) 

31>>> collector = StatisticsCollector(instance) 

32>>> collector.set_dest(statistics) 

33>>> simulation = Simulation(instance, collector) 

34>>> simulation.ctrl_run() 

35>>> print("\n".join(str(statistics).split("\n")[:-1])) 

36stat;total;product_0;product_1 

37trp.min;229.57142857142858;455.6;229.57142857142858 

38trp.mean;304.8888888888889;455.6;246.92307692307693 

39trp.max;455.6;455.6;267.1666666666667 

40trp.sd;97.56326157594913;0;19.50721118346466 

41cwt.min;1603;2278;1603 

42cwt.mean;1829.3333333333333;2278;1605 

43cwt.max;2278;2278;1607 

44cwt.sd;388.56187838403986;;2.8284271247461903 

45fill.rate;0;0;0 

46stocklevel.mean;0.6078765407355446;0.4497444633730835;0.15813207736246118 

47fulfilled.rate;1;1;1 

48""" 

49 

50from time import time_ns 

51from typing import Final 

52 

53from pycommons.math.stream_statistics import ( 

54 StreamStatistics, 

55 StreamStatisticsAggregate, 

56) 

57from pycommons.math.streams import StreamSum 

58from pycommons.types import type_error 

59 

60from moptipyapps.prodsched.instance import ( 

61 Demand, 

62 Instance, 

63) 

64from moptipyapps.prodsched.simulation import Job, Listener 

65from moptipyapps.prodsched.statistics import Statistics 

66 

67 

68class StatisticsCollector(Listener): 

69 """A listener for simulation events.""" 

70 

71 def __init__(self, instance: Instance) -> None: 

72 """ 

73 Initialize the listener. 

74 

75 :param instance: the instance 

76 """ 

77 if not isinstance(instance, Instance): 

78 raise type_error(instance, "instance", Instance) 

79 #: the destination to write to. 

80 self.__dest: Statistics | None = None 

81 #: the end of the warmup period 

82 self.__warmup: Final[float] = instance.time_end_warmup 

83 #: the total time window length 

84 self.__total: Final[float] = instance.time_end_measure 

85 #: the number of products 

86 self.__n_products: Final[int] = instance.n_products 

87 #: the measurable demands per product 

88 self.__n_mdpb: Final[tuple[int, ...]] = ( 

89 instance.n_measurable_demands_per_product) 

90 

91 n_products: Final[int] = instance.n_products 

92 #: the internal per-product production time records 

93 self.__production_times: Final[tuple[ 

94 StreamStatisticsAggregate[StreamStatistics], ...]] = tuple( 

95 StreamStatistics.aggregate() for _ in range(n_products)) 

96 #: the internal total production time 

97 self.__production_time: Final[StreamStatisticsAggregate[ 

98 StreamStatistics]] = StreamStatistics.aggregate() 

99 #: the immediately satisfied products: satisfied vs. total 

100 self.__immediately_satisfied: Final[list[int]] = [0] * n_products 

101 #: the statistics of the waiting time for not-immediately satisfied 

102 #: products, per product 

103 self.__waiting_times: Final[tuple[ 

104 StreamStatisticsAggregate[StreamStatistics], ...]] = tuple( 

105 StreamStatistics.aggregate() for _ in range(n_products)) 

106 #: the total waiting time for non-immediately satisfied products 

107 self.__waiting_time: Final[StreamStatisticsAggregate[ 

108 StreamStatistics]] = StreamStatistics.aggregate() 

109 #: the number of fulfilled jobs, per-product 

110 self.__fulfilled: Final[list[int]] = [0] * n_products 

111 #: the stock levels on a per-product basis 

112 self.__stock_levels: Final[tuple[StreamSum, ...]] = tuple( 

113 StreamSum() for _ in range(n_products)) 

114 #: the total stock level sum 

115 self.__stock_level: Final[StreamSum] = StreamSum() 

116 #: the number of products currently in warehouse and since when 

117 #: they were there 

118 self.__in_warehouse: Final[tuple[list[float], ...]] = tuple([ 

119 0.0, 0.0] for _ in range(n_products)) 

120 #: the stat time of the simulation 

121 self.__start: int = -1 

122 

123 def set_dest(self, dest: Statistics) -> None: 

124 """ 

125 Set the statistics record to fill. 

126 

127 :param dest: the destination 

128 """ 

129 if not isinstance(dest, Statistics): 

130 raise type_error(dest, "dest", Statistics) 

131 self.__dest = dest 

132 

133 def start(self) -> None: 

134 """Clear all the data of the collector.""" 

135 if self.__dest is None: 

136 raise ValueError("Need destination statistics!") 

137 self.__start = time_ns() 

138 n_products: Final[int] = self.__n_products 

139 for i in range(n_products): 

140 self.__production_times[i].reset() 

141 self.__immediately_satisfied[i] = 0 

142 self.__waiting_times[i].reset() 

143 self.__fulfilled[i] = 0 

144 self.__stock_levels[i].reset() 

145 wh = self.__in_warehouse[i] 

146 wh[0] = 0.0 

147 wh[1] = 0.0 

148 self.__production_time.reset() 

149 self.__waiting_time.reset() 

150 self.__stock_level.reset() 

151 

152 def product_in_warehouse( 

153 self, time: float, product_id: int, amount: int, 

154 is_in_measure_period: bool) -> None: 

155 """ 

156 Report a change of the amount of products in the warehouse. 

157 

158 :param time: the current time 

159 :param product_id: the product ID 

160 :param amount: the new absolute total amount of that product in the 

161 warehouse 

162 :param is_in_measure_period: is this event inside the measurement 

163 period? 

164 """ 

165 iwh: Final[list[float]] = self.__in_warehouse[product_id] 

166 if is_in_measure_period: 

167 value: float = (time - max(iwh[1], self.__warmup)) * iwh[0] 

168 self.__stock_levels[product_id].add(value) 

169 self.__stock_level.add(value) 

170 iwh[0] = amount 

171 iwh[1] = time 

172 

173 def produce_at_end( 

174 self, time: float, station_id: int, job: Job) -> None: 

175 """ 

176 Report the completion of the production of a product at a station. 

177 

178 :param time: the current time 

179 :param station_id: the station ID 

180 :param job: the production job 

181 """ 

182 if job.measure and job.completed: 

183 am: Final[int] = job.amount 

184 tt: float = time - job.arrival 

185 if am <= 1: 

186 self.__production_times[job.product_id].add(tt) 

187 self.__production_time.add(tt) 

188 else: # deal with amounts > 1 

189 tt /= am 

190 for _ in range(am): 

191 self.__production_times[job.product_id].add(tt) 

192 self.__production_time.add(tt) 

193 

194 def demand_satisfied( 

195 self, time: float, demand: Demand) -> None: 

196 """ 

197 Report that a given demand has been satisfied. 

198 

199 :param time: the time index when the demand was satisfied 

200 :param demand: the demand that was satisfied 

201 """ 

202 if demand.measure: 

203 at: Final[float] = demand.arrival 

204 pid: Final[int] = demand.product_id 

205 if at >= time: 

206 self.__immediately_satisfied[pid] += 1 

207 else: 

208 tt: Final[float] = time - at 

209 self.__waiting_times[pid].add(tt) 

210 self.__waiting_time.add(tt) 

211 self.__fulfilled[pid] += 1 

212 

213 def finished(self, time: float) -> None: 

214 """ 

215 Fill the collected statistics into the statistics record. 

216 

217 :param time: the time when we are finished 

218 """ 

219 dest: Final[Statistics | None] = self.__dest 

220 if dest is None: 

221 raise ValueError("Lost destination statistics record?") 

222 total: Final[tuple[int, ...]] = self.__n_mdpb 

223 

224 for i, stat in enumerate(self.__production_times): 

225 dest.production_times[i] = stat.result_or_none() 

226 dest.production_time = self.__production_time.result_or_none() 

227 

228 sn: int = 0 

229 sf: int = 0 

230 st: int = 0 

231 for i, n in enumerate(self.__immediately_satisfied): 

232 t = total[i] 

233 dest.immediate_rates[i] = n / t 

234 sn += n 

235 f = self.__fulfilled[i] 

236 dest.fulfilled_rates[i] = f / t 

237 sf += f 

238 st += t 

239 dest.immediate_rate = sn / st 

240 dest.fulfilled_rate = sf / st 

241 

242 for i, stat in enumerate(self.__waiting_times): 

243 dest.waiting_times[i] = stat.result_or_none() 

244 dest.waiting_time = self.__waiting_time.result_or_none() 

245 

246 slm: Final[StreamSum] = self.__stock_level 

247 twl: Final[float] = self.__total - self.__warmup 

248 wu: Final[float] = self.__warmup 

249 for i, sm in enumerate(self.__stock_levels): 

250 wh = self.__in_warehouse[i] 

251 v: float = (self.__total - max(wh[1], wu)) * wh[0] 

252 sm.add(v) 

253 slm.add(v) 

254 dest.stock_levels[i] = sm.result() / twl 

255 dest.stock_level = slm.result() / twl 

256 dest.simulation_time_nanos = time_ns() - self.__start