Coverage for moptipyapps / prodsched / statistics_collector.py: 94%
113 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 04:40 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 04:40 +0000
1r"""
2A tool for collecting statistics from an MFC simulation.
4A statistics collector is a special
5:class:`~moptipyapps.prodsched.simulation.Listener`
6that can be plugged into a
7:class:`~moptipyapps.prodsched.simulation.Simulation`.
8During the execution of the simulation, it gathers statistics about what is
9going on. It finally stores these into a
10:class:`~moptipyapps.prodsched.statistics.Statistics` record.
11Such a record can then be used to understand the key characteristics of the
12behavior of the simulation in on a given
13:class:`~moptipyapps.prodsched.instance.Instance`.
15>>> instance = Instance(
16... name="test2", n_products=2, n_customers=1, n_stations=2, n_demands=5,
17... time_end_warmup=21, time_end_measure=10000,
18... routes=[[0, 1], [1, 0]],
19... demands=[[0, 0, 1, 10, 20, 90], [1, 0, 0, 5, 22, 200],
20... [2, 0, 1, 7, 30, 200], [3, 0, 1, 6, 60, 200],
21... [4, 0, 0, 125, 5, 2000]],
22... warehous_at_t0=[2, 1],
23... station_product_unit_times=[[[10.0, 50.0, 15.0, 100.0],
24... [ 5.0, 20.0, 7.0, 35.0, 4.0, 50.0]],
25... [[ 5.0, 24.0, 7.0, 80.0],
26... [ 3.0, 21.0, 6.0, 50.0,]]])
29>>> from moptipyapps.prodsched.simulation import Simulation
30>>> statistics = Statistics(instance.n_products)
31>>> collector = StatisticsCollector(instance)
32>>> collector.set_dest(statistics)
33>>> simulation = Simulation(instance, collector)
34>>> simulation.ctrl_run()
35>>> print("\n".join(str(statistics).split("\n")[:-1]))
36stat;total;product_0;product_1
37trp.min;229.57142857142858;455.6;229.57142857142858
38trp.mean;304.8888888888889;455.6;246.92307692307693
39trp.max;455.6;455.6;267.1666666666667
40trp.sd;97.56326157594913;0;19.50721118346466
41cwt.min;1603;2278;1603
42cwt.mean;1829.3333333333333;2278;1605
43cwt.max;2278;2278;1607
44cwt.sd;388.56187838403986;;2.8284271247461903
45fill.rate;0;0;0
46stocklevel.mean;0.6078765407355446;0.4497444633730835;0.15813207736246118
47fulfilled.rate;1;1;1
48"""
50from time import time_ns
51from typing import Final
53from pycommons.math.stream_statistics import (
54 StreamStatistics,
55 StreamStatisticsAggregate,
56)
57from pycommons.math.streams import StreamSum
58from pycommons.types import type_error
60from moptipyapps.prodsched.instance import (
61 Demand,
62 Instance,
63)
64from moptipyapps.prodsched.simulation import Job, Listener
65from moptipyapps.prodsched.statistics import Statistics
68class StatisticsCollector(Listener):
69 """A listener for simulation events."""
71 def __init__(self, instance: Instance) -> None:
72 """
73 Initialize the listener.
75 :param instance: the instance
76 """
77 if not isinstance(instance, Instance):
78 raise type_error(instance, "instance", Instance)
79 #: the destination to write to.
80 self.__dest: Statistics | None = None
81 #: the end of the warmup period
82 self.__warmup: Final[float] = instance.time_end_warmup
83 #: the total time window length
84 self.__total: Final[float] = instance.time_end_measure
85 #: the number of products
86 self.__n_products: Final[int] = instance.n_products
87 #: the measurable demands per product
88 self.__n_mdpb: Final[tuple[int, ...]] = (
89 instance.n_measurable_demands_per_product)
91 n_products: Final[int] = instance.n_products
92 #: the internal per-product production time records
93 self.__production_times: Final[tuple[
94 StreamStatisticsAggregate[StreamStatistics], ...]] = tuple(
95 StreamStatistics.aggregate() for _ in range(n_products))
96 #: the internal total production time
97 self.__production_time: Final[StreamStatisticsAggregate[
98 StreamStatistics]] = StreamStatistics.aggregate()
99 #: the immediately satisfied products: satisfied vs. total
100 self.__immediately_satisfied: Final[list[int]] = [0] * n_products
101 #: the statistics of the waiting time for not-immediately satisfied
102 #: products, per product
103 self.__waiting_times: Final[tuple[
104 StreamStatisticsAggregate[StreamStatistics], ...]] = tuple(
105 StreamStatistics.aggregate() for _ in range(n_products))
106 #: the total waiting time for non-immediately satisfied products
107 self.__waiting_time: Final[StreamStatisticsAggregate[
108 StreamStatistics]] = StreamStatistics.aggregate()
109 #: the number of fulfilled jobs, per-product
110 self.__fulfilled: Final[list[int]] = [0] * n_products
111 #: the stock levels on a per-product basis
112 self.__stock_levels: Final[tuple[StreamSum, ...]] = tuple(
113 StreamSum() for _ in range(n_products))
114 #: the total stock level sum
115 self.__stock_level: Final[StreamSum] = StreamSum()
116 #: the number of products currently in warehouse and since when
117 #: they were there
118 self.__in_warehouse: Final[tuple[list[float], ...]] = tuple([
119 0.0, 0.0] for _ in range(n_products))
120 #: the stat time of the simulation
121 self.__start: int = -1
123 def set_dest(self, dest: Statistics) -> None:
124 """
125 Set the statistics record to fill.
127 :param dest: the destination
128 """
129 if not isinstance(dest, Statistics):
130 raise type_error(dest, "dest", Statistics)
131 self.__dest = dest
133 def start(self) -> None:
134 """Clear all the data of the collector."""
135 if self.__dest is None:
136 raise ValueError("Need destination statistics!")
137 self.__start = time_ns()
138 n_products: Final[int] = self.__n_products
139 for i in range(n_products):
140 self.__production_times[i].reset()
141 self.__immediately_satisfied[i] = 0
142 self.__waiting_times[i].reset()
143 self.__fulfilled[i] = 0
144 self.__stock_levels[i].reset()
145 wh = self.__in_warehouse[i]
146 wh[0] = 0.0
147 wh[1] = 0.0
148 self.__production_time.reset()
149 self.__waiting_time.reset()
150 self.__stock_level.reset()
152 def product_in_warehouse(
153 self, time: float, product_id: int, amount: int,
154 is_in_measure_period: bool) -> None:
155 """
156 Report a change of the amount of products in the warehouse.
158 :param time: the current time
159 :param product_id: the product ID
160 :param amount: the new absolute total amount of that product in the
161 warehouse
162 :param is_in_measure_period: is this event inside the measurement
163 period?
164 """
165 iwh: Final[list[float]] = self.__in_warehouse[product_id]
166 if is_in_measure_period:
167 value: float = (time - max(iwh[1], self.__warmup)) * iwh[0]
168 self.__stock_levels[product_id].add(value)
169 self.__stock_level.add(value)
170 iwh[0] = amount
171 iwh[1] = time
173 def produce_at_end(
174 self, time: float, station_id: int, job: Job) -> None:
175 """
176 Report the completion of the production of a product at a station.
178 :param time: the current time
179 :param station_id: the station ID
180 :param job: the production job
181 """
182 if job.measure and job.completed:
183 am: Final[int] = job.amount
184 tt: float = time - job.arrival
185 if am <= 1:
186 self.__production_times[job.product_id].add(tt)
187 self.__production_time.add(tt)
188 else: # deal with amounts > 1
189 tt /= am
190 for _ in range(am):
191 self.__production_times[job.product_id].add(tt)
192 self.__production_time.add(tt)
194 def demand_satisfied(
195 self, time: float, demand: Demand) -> None:
196 """
197 Report that a given demand has been satisfied.
199 :param time: the time index when the demand was satisfied
200 :param demand: the demand that was satisfied
201 """
202 if demand.measure:
203 at: Final[float] = demand.arrival
204 pid: Final[int] = demand.product_id
205 if at >= time:
206 self.__immediately_satisfied[pid] += 1
207 else:
208 tt: Final[float] = time - at
209 self.__waiting_times[pid].add(tt)
210 self.__waiting_time.add(tt)
211 self.__fulfilled[pid] += 1
213 def finished(self, time: float) -> None:
214 """
215 Fill the collected statistics into the statistics record.
217 :param time: the time when we are finished
218 """
219 dest: Final[Statistics | None] = self.__dest
220 if dest is None:
221 raise ValueError("Lost destination statistics record?")
222 total: Final[tuple[int, ...]] = self.__n_mdpb
224 for i, stat in enumerate(self.__production_times):
225 dest.production_times[i] = stat.result_or_none()
226 dest.production_time = self.__production_time.result_or_none()
228 sn: int = 0
229 sf: int = 0
230 st: int = 0
231 for i, n in enumerate(self.__immediately_satisfied):
232 t = total[i]
233 dest.immediate_rates[i] = n / t
234 sn += n
235 f = self.__fulfilled[i]
236 dest.fulfilled_rates[i] = f / t
237 sf += f
238 st += t
239 dest.immediate_rate = sn / st
240 dest.fulfilled_rate = sf / st
242 for i, stat in enumerate(self.__waiting_times):
243 dest.waiting_times[i] = stat.result_or_none()
244 dest.waiting_time = self.__waiting_time.result_or_none()
246 slm: Final[StreamSum] = self.__stock_level
247 twl: Final[float] = self.__total - self.__warmup
248 wu: Final[float] = self.__warmup
249 for i, sm in enumerate(self.__stock_levels):
250 wh = self.__in_warehouse[i]
251 v: float = (self.__total - max(wh[1], wu)) * wh[0]
252 sm.add(v)
253 slm.add(v)
254 dest.stock_levels[i] = sm.result() / twl
255 dest.stock_level = slm.result() / twl
256 dest.simulation_time_nanos = time_ns() - self.__start