Coverage for moptipyapps / prodsched / statistics_collector.py: 96%
113 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-13 08:40 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-13 08:40 +0000
1r"""
2A tool for collecting statistics from an MFC simulation.
4A statistics collector :class:`~StatisticsCollector` is a special
5:class:`~moptipyapps.prodsched.simulation.Listener`
6that can be plugged into a
7:class:`~moptipyapps.prodsched.simulation.Simulation`.
8During the execution of the simulation, it gathers statistics about what is
9going on. It finally stores these into a
10:class:`~moptipyapps.prodsched.statistics.Statistics` record.
11Such a record can then be used to understand the key characteristics of the
12behavior of the simulation in on a given
13:class:`~moptipyapps.prodsched.instance.Instance`.
15The simulation listeners (:class:`~moptipyapps.prodsched.simulation.Listener`)
16offer a method to pipe out data from arbitrary subclasses of
17:class:`~moptipyapps.prodsched.simulation.Simulation`.
18This means that they allow us to access data in a unified way, regardless of
19which manufacturing logic or production scheduling we actually implement.
21In the case of the :class:`~StatisticsCollector` implemented here, we
22implement the :class:`~moptipyapps.prodsched.simulation.Listener`-API to
23fill a :class:`~moptipyapps.prodsched.statistics.Statistics` record with data.
24Such records offer the standard statistics that Thürer et al. used in their
25works.
26In other words, we can make such statistics available and accessible,
27regardless of how our simulation schedules the production.
29Moreover, multiple such :class:`~moptipyapps.prodsched.statistics.Statistics`
30records, filled with data from simulations over multiple different instances
31(:mod:`~moptipyapps.prodsched.instance`) can be combined in a
32:class:`~moptipyapps.prodsched.multistatistics.MultiStatistics` record.
33This record, which will comprehensively represent performance over several
34independent instances, then can be used as basis for objective functions
35(:class:`~moptipy.api.objective.Objective`) such as those given in
36:mod:`~moptipyapps.prodsched.objectives`.
38>>> instance = Instance(
39... name="test2", n_products=2, n_customers=1, n_stations=2, n_demands=5,
40... time_end_warmup=21, time_end_measure=10000,
41... routes=[[0, 1], [1, 0]],
42... demands=[[0, 0, 1, 10, 20, 90], [1, 0, 0, 5, 22, 200],
43... [2, 0, 1, 7, 30, 200], [3, 0, 1, 6, 60, 200],
44... [4, 0, 0, 125, 5, 2000]],
45... warehous_at_t0=[2, 1],
46... station_product_unit_times=[[[10.0, 50.0, 15.0, 100.0],
47... [ 5.0, 20.0, 7.0, 35.0, 4.0, 50.0]],
48... [[ 5.0, 24.0, 7.0, 80.0],
49... [ 3.0, 21.0, 6.0, 50.0,]]])
52>>> from moptipyapps.prodsched.simulation import Simulation
53>>> statistics = Statistics(instance.n_products)
54>>> collector = StatisticsCollector(instance)
55>>> collector.set_dest(statistics)
56>>> simulation = Simulation(instance, collector)
57>>> simulation.ctrl_run()
58>>> from moptipyapps.prodsched.statistics import to_stream
59>>> text = list(to_stream(statistics))
60>>> print("\n".join(text[:-1]))
61stat;total;product_0;product_1
62trp.min;229.57142857142858;455.6;229.57142857142858
63trp.mean;304.8888888888889;455.6;246.92307692307693
64trp.max;455.6;455.6;267.1666666666667
65trp.sd;97.56326157594913;0;19.50721118346466
66cwt.min;1603;2278;1603
67cwt.mean;1829.3333333333333;2278;1605
68cwt.max;2278;2278;1607
69cwt.sd;388.56187838403986;;2.8284271247461903
70fill.rate;0;0;0
71stocklevel.mean;0.6078765407355446;0.4497444633730835;0.15813207736246118
72fulfilled.rate;1;1;1
74>>> statistics_2 = Statistics(instance.n_products)
75>>> statistics_2 = statistics_2.from_stream(text)
76>>> print("\n".join(list(to_stream(statistics_2))[:-1]))
77stat;total;product_0;product_1
78trp.min;229.57142857142858;455.6;229.57142857142858
79trp.mean;304.8888888888889;455.6;246.92307692307693
80trp.max;455.6;455.6;267.1666666666667
81trp.sd;97.56326157594913;0;19.50721118346466
82cwt.min;1603;2278;1603
83cwt.mean;1829.3333333333333;2278;1605
84cwt.max;2278;2278;1607
85cwt.sd;388.56187838403986;;2.8284271247461903
86fill.rate;0;0;0
87stocklevel.mean;0.6078765407355446;0.4497444633730835;0.15813207736246118
88fulfilled.rate;1;1;1
89"""
91from time import time_ns
92from typing import Final
94from pycommons.math.stream_statistics import (
95 StreamStatistics,
96 StreamStatisticsAggregate,
97)
98from pycommons.math.streams import StreamSum
99from pycommons.types import type_error
101from moptipyapps.prodsched.instance import (
102 Demand,
103 Instance,
104)
105from moptipyapps.prodsched.simulation import Job, Listener
106from moptipyapps.prodsched.statistics import Statistics
109class StatisticsCollector(Listener):
110 """A listener for simulation events that collects basic statistics."""
112 def __init__(self, instance: Instance) -> None:
113 """
114 Initialize the listener.
116 :param instance: the instance
117 """
118 if not isinstance(instance, Instance):
119 raise type_error(instance, "instance", Instance)
120 #: the destination to write to.
121 self.__dest: Statistics | None = None
122 #: the end of the warmup period
123 self.__warmup: Final[float] = instance.time_end_warmup
124 #: the total time window length
125 self.__total: Final[float] = instance.time_end_measure
126 #: the number of products
127 self.__n_products: Final[int] = instance.n_products
128 #: the measurable demands per product
129 self.__n_mdpb: Final[tuple[int, ...]] = (
130 instance.n_measurable_demands_per_product)
132 n_products: Final[int] = instance.n_products
133 #: the internal per-product production time records
134 self.__production_times: Final[tuple[
135 StreamStatisticsAggregate[StreamStatistics], ...]] = tuple(
136 StreamStatistics.aggregate() for _ in range(n_products))
137 #: the internal total production time
138 self.__production_time: Final[StreamStatisticsAggregate[
139 StreamStatistics]] = StreamStatistics.aggregate()
140 #: the immediately satisfied products: satisfied vs. total
141 self.__immediately_satisfied: Final[list[int]] = [0] * n_products
142 #: the statistics of the waiting time for not-immediately satisfied
143 #: products, per product
144 self.__waiting_times: Final[tuple[
145 StreamStatisticsAggregate[StreamStatistics], ...]] = tuple(
146 StreamStatistics.aggregate() for _ in range(n_products))
147 #: the total waiting time for non-immediately satisfied products
148 self.__waiting_time: Final[StreamStatisticsAggregate[
149 StreamStatistics]] = StreamStatistics.aggregate()
150 #: the number of fulfilled jobs, per-product
151 self.__fulfilled: Final[list[int]] = [0] * n_products
152 #: the stock levels on a per-product basis
153 self.__stock_levels: Final[tuple[StreamSum, ...]] = tuple(
154 StreamSum() for _ in range(n_products))
155 #: the total stock level sum
156 self.__stock_level: Final[StreamSum] = StreamSum()
157 #: the number of products currently in warehouse and since when
158 #: they were there
159 self.__in_warehouse: Final[tuple[list[float], ...]] = tuple([
160 0.0, 0.0] for _ in range(n_products))
161 #: the stat time of the simulation
162 self.__start: int = -1
164 def set_dest(self, dest: Statistics) -> None:
165 """
166 Set the statistics record to fill.
168 :param dest: the destination
169 """
170 if not isinstance(dest, Statistics):
171 raise type_error(dest, "dest", Statistics)
172 self.__dest = dest
174 def start(self) -> None:
175 """Clear all the data of the collector."""
176 if self.__dest is None:
177 raise ValueError("Need destination statistics!")
178 self.__start = time_ns()
179 n_products: Final[int] = self.__n_products
180 for i in range(n_products):
181 self.__production_times[i].reset()
182 self.__immediately_satisfied[i] = 0
183 self.__waiting_times[i].reset()
184 self.__fulfilled[i] = 0
185 self.__stock_levels[i].reset()
186 wh = self.__in_warehouse[i]
187 wh[0] = 0.0
188 wh[1] = 0.0
189 self.__production_time.reset()
190 self.__waiting_time.reset()
191 self.__stock_level.reset()
193 def product_in_warehouse(
194 self, time: float, product_id: int, amount: int,
195 is_in_measure_period: bool) -> None:
196 """
197 Report a change of the amount of products in the warehouse.
199 :param time: the current time
200 :param product_id: the product ID
201 :param amount: the new absolute total amount of that product in the
202 warehouse
203 :param is_in_measure_period: is this event inside the measurement
204 period?
205 """
206 iwh: Final[list[float]] = self.__in_warehouse[product_id]
207 if is_in_measure_period:
208 value: float = (time - max(iwh[1], self.__warmup)) * iwh[0]
209 self.__stock_levels[product_id].add(value)
210 self.__stock_level.add(value)
211 iwh[0] = amount
212 iwh[1] = time
214 def produce_at_end(
215 self, time: float, station_id: int, job: Job) -> None:
216 """
217 Report the completion of the production of a product at a station.
219 :param time: the current time
220 :param station_id: the station ID
221 :param job: the production job
222 """
223 if job.measure and job.completed:
224 am: Final[int] = job.amount
225 tt: float = time - job.arrival
226 if am <= 1:
227 self.__production_times[job.product_id].add(tt)
228 self.__production_time.add(tt)
229 else: # deal with amounts > 1
230 tt /= am
231 for _ in range(am):
232 self.__production_times[job.product_id].add(tt)
233 self.__production_time.add(tt)
235 def demand_satisfied(
236 self, time: float, demand: Demand) -> None:
237 """
238 Report that a given demand has been satisfied.
240 :param time: the time index when the demand was satisfied
241 :param demand: the demand that was satisfied
242 """
243 if demand.measure:
244 at: Final[float] = demand.arrival
245 pid: Final[int] = demand.product_id
246 if at >= time:
247 self.__immediately_satisfied[pid] += 1
248 else:
249 tt: Final[float] = time - at
250 self.__waiting_times[pid].add(tt)
251 self.__waiting_time.add(tt)
252 self.__fulfilled[pid] += 1
254 def finished(self, time: float) -> None:
255 """
256 Fill the collected statistics into the statistics record.
258 :param time: the time when we are finished
259 """
260 dest: Final[Statistics | None] = self.__dest
261 if dest is None:
262 raise ValueError("Lost destination statistics record?")
263 total: Final[tuple[int, ...]] = self.__n_mdpb
265 for i, stat in enumerate(self.__production_times):
266 dest.production_times[i] = stat.result_or_none()
267 dest.production_time = self.__production_time.result_or_none()
269 sn: int = 0
270 sf: int = 0
271 st: int = 0
272 for i, n in enumerate(self.__immediately_satisfied):
273 t = total[i]
274 dest.immediate_rates[i] = n / t
275 sn += n
276 f = self.__fulfilled[i]
277 dest.fulfilled_rates[i] = f / t
278 sf += f
279 st += t
280 dest.immediate_rate = sn / st
281 dest.fulfilled_rate = sf / st
283 for i, stat in enumerate(self.__waiting_times):
284 dest.waiting_times[i] = stat.result_or_none()
285 dest.waiting_time = self.__waiting_time.result_or_none()
287 slm: Final[StreamSum] = self.__stock_level
288 twl: Final[float] = self.__total - self.__warmup
289 wu: Final[float] = self.__warmup
290 for i, sm in enumerate(self.__stock_levels):
291 wh = self.__in_warehouse[i]
292 v: float = (self.__total - max(wh[1], wu)) * wh[0]
293 sm.add(v)
294 slm.add(v)
295 dest.stock_levels[i] = sm.result() / twl
296 dest.stock_level = slm.result() / twl
297 dest.simulation_time_nanos = time_ns() - self.__start