Coverage for moptipyapps / prodsched / statistics_collector.py: 96%
113 statements
« prev ^ index » next coverage.py v7.13.1, created at 2025-12-30 03:25 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2025-12-30 03:25 +0000
1r"""
2A tool for collecting statistics from an MFC simulation.
4A statistics collector :class:`~StatisticsCollector` is a special
5:class:`~moptipyapps.prodsched.simulation.Listener`
6that can be plugged into a
7:class:`~moptipyapps.prodsched.simulation.Simulation`.
8During the execution of the simulation, it gathers statistics about what is
9going on. It finally stores these into a
10:class:`~moptipyapps.prodsched.statistics.Statistics` record.
11Such a record can then be used to understand the key characteristics of the
12behavior of the simulation in on a given
13:class:`~moptipyapps.prodsched.instance.Instance`.
15The simulation listeners (:class:`~moptipyapps.prodsched.simulation.Listener`)
16offer a method to pipe out data from arbitrary subclasses of
17:class:`~moptipyapps.prodsched.simulation.Simulation`.
18This means that they allow us to access data in a unified way, regardless of
19which manufacturing logic or production scheduling we actually implement.
21In the case of the :class:`~StatisticsCollector` implemented here, we
22implement the :class:`~moptipyapps.prodsched.simulation.Listener`-API to
23fill a :class:`~moptipyapps.prodsched.statistics.Statistics` record with data.
24Such records offer the standard statistics that Thürer et al. used in their
25works.
26In other words, we can make such statistics available and accessible,
27regardless of how our simulation schedules the production.
29Moreover, multiple such :class:`~moptipyapps.prodsched.statistics.Statistics`
30records, filled with data from simulations over multiple different instances
31(:mod:`~moptipyapps.prodsched.instance`) can be combined in a
32:class:`~moptipyapps.prodsched.multistatistics.MultiStatistics` record.
33This record, which will comprehensively represent performance over several
34independent instances, then can be used as basis for objective functions
35(:class:`~moptipy.api.objective.Objective`) such as those given in
36:mod:`~moptipyapps.prodsched.objectives`.
38>>> instance = Instance(
39... name="test2", n_products=2, n_customers=1, n_stations=2, n_demands=5,
40... time_end_warmup=21, time_end_measure=10000,
41... routes=[[0, 1], [1, 0]],
42... demands=[[0, 0, 1, 10, 20, 90], [1, 0, 0, 5, 22, 200],
43... [2, 0, 1, 7, 30, 200], [3, 0, 1, 6, 60, 200],
44... [4, 0, 0, 125, 5, 2000]],
45... warehous_at_t0=[2, 1],
46... station_product_unit_times=[[[10.0, 50.0, 15.0, 100.0],
47... [ 5.0, 20.0, 7.0, 35.0, 4.0, 50.0]],
48... [[ 5.0, 24.0, 7.0, 80.0],
49... [ 3.0, 21.0, 6.0, 50.0,]]])
52>>> from moptipyapps.prodsched.simulation import Simulation
53>>> statistics = Statistics(instance.n_products)
54>>> collector = StatisticsCollector(instance)
55>>> collector.set_dest(statistics)
56>>> simulation = Simulation(instance, collector)
57>>> simulation.ctrl_run()
58>>> print("\n".join(str(statistics).split("\n")[:-1]))
59stat;total;product_0;product_1
60trp.min;229.57142857142858;455.6;229.57142857142858
61trp.mean;304.8888888888889;455.6;246.92307692307693
62trp.max;455.6;455.6;267.1666666666667
63trp.sd;97.56326157594913;0;19.50721118346466
64cwt.min;1603;2278;1603
65cwt.mean;1829.3333333333333;2278;1605
66cwt.max;2278;2278;1607
67cwt.sd;388.56187838403986;;2.8284271247461903
68fill.rate;0;0;0
69stocklevel.mean;0.6078765407355446;0.4497444633730835;0.15813207736246118
70fulfilled.rate;1;1;1
71"""
73from time import time_ns
74from typing import Final
76from pycommons.math.stream_statistics import (
77 StreamStatistics,
78 StreamStatisticsAggregate,
79)
80from pycommons.math.streams import StreamSum
81from pycommons.types import type_error
83from moptipyapps.prodsched.instance import (
84 Demand,
85 Instance,
86)
87from moptipyapps.prodsched.simulation import Job, Listener
88from moptipyapps.prodsched.statistics import Statistics
91class StatisticsCollector(Listener):
92 """A listener for simulation events that collects basic statistics."""
94 def __init__(self, instance: Instance) -> None:
95 """
96 Initialize the listener.
98 :param instance: the instance
99 """
100 if not isinstance(instance, Instance):
101 raise type_error(instance, "instance", Instance)
102 #: the destination to write to.
103 self.__dest: Statistics | None = None
104 #: the end of the warmup period
105 self.__warmup: Final[float] = instance.time_end_warmup
106 #: the total time window length
107 self.__total: Final[float] = instance.time_end_measure
108 #: the number of products
109 self.__n_products: Final[int] = instance.n_products
110 #: the measurable demands per product
111 self.__n_mdpb: Final[tuple[int, ...]] = (
112 instance.n_measurable_demands_per_product)
114 n_products: Final[int] = instance.n_products
115 #: the internal per-product production time records
116 self.__production_times: Final[tuple[
117 StreamStatisticsAggregate[StreamStatistics], ...]] = tuple(
118 StreamStatistics.aggregate() for _ in range(n_products))
119 #: the internal total production time
120 self.__production_time: Final[StreamStatisticsAggregate[
121 StreamStatistics]] = StreamStatistics.aggregate()
122 #: the immediately satisfied products: satisfied vs. total
123 self.__immediately_satisfied: Final[list[int]] = [0] * n_products
124 #: the statistics of the waiting time for not-immediately satisfied
125 #: products, per product
126 self.__waiting_times: Final[tuple[
127 StreamStatisticsAggregate[StreamStatistics], ...]] = tuple(
128 StreamStatistics.aggregate() for _ in range(n_products))
129 #: the total waiting time for non-immediately satisfied products
130 self.__waiting_time: Final[StreamStatisticsAggregate[
131 StreamStatistics]] = StreamStatistics.aggregate()
132 #: the number of fulfilled jobs, per-product
133 self.__fulfilled: Final[list[int]] = [0] * n_products
134 #: the stock levels on a per-product basis
135 self.__stock_levels: Final[tuple[StreamSum, ...]] = tuple(
136 StreamSum() for _ in range(n_products))
137 #: the total stock level sum
138 self.__stock_level: Final[StreamSum] = StreamSum()
139 #: the number of products currently in warehouse and since when
140 #: they were there
141 self.__in_warehouse: Final[tuple[list[float], ...]] = tuple([
142 0.0, 0.0] for _ in range(n_products))
143 #: the stat time of the simulation
144 self.__start: int = -1
146 def set_dest(self, dest: Statistics) -> None:
147 """
148 Set the statistics record to fill.
150 :param dest: the destination
151 """
152 if not isinstance(dest, Statistics):
153 raise type_error(dest, "dest", Statistics)
154 self.__dest = dest
156 def start(self) -> None:
157 """Clear all the data of the collector."""
158 if self.__dest is None:
159 raise ValueError("Need destination statistics!")
160 self.__start = time_ns()
161 n_products: Final[int] = self.__n_products
162 for i in range(n_products):
163 self.__production_times[i].reset()
164 self.__immediately_satisfied[i] = 0
165 self.__waiting_times[i].reset()
166 self.__fulfilled[i] = 0
167 self.__stock_levels[i].reset()
168 wh = self.__in_warehouse[i]
169 wh[0] = 0.0
170 wh[1] = 0.0
171 self.__production_time.reset()
172 self.__waiting_time.reset()
173 self.__stock_level.reset()
175 def product_in_warehouse(
176 self, time: float, product_id: int, amount: int,
177 is_in_measure_period: bool) -> None:
178 """
179 Report a change of the amount of products in the warehouse.
181 :param time: the current time
182 :param product_id: the product ID
183 :param amount: the new absolute total amount of that product in the
184 warehouse
185 :param is_in_measure_period: is this event inside the measurement
186 period?
187 """
188 iwh: Final[list[float]] = self.__in_warehouse[product_id]
189 if is_in_measure_period:
190 value: float = (time - max(iwh[1], self.__warmup)) * iwh[0]
191 self.__stock_levels[product_id].add(value)
192 self.__stock_level.add(value)
193 iwh[0] = amount
194 iwh[1] = time
196 def produce_at_end(
197 self, time: float, station_id: int, job: Job) -> None:
198 """
199 Report the completion of the production of a product at a station.
201 :param time: the current time
202 :param station_id: the station ID
203 :param job: the production job
204 """
205 if job.measure and job.completed:
206 am: Final[int] = job.amount
207 tt: float = time - job.arrival
208 if am <= 1:
209 self.__production_times[job.product_id].add(tt)
210 self.__production_time.add(tt)
211 else: # deal with amounts > 1
212 tt /= am
213 for _ in range(am):
214 self.__production_times[job.product_id].add(tt)
215 self.__production_time.add(tt)
217 def demand_satisfied(
218 self, time: float, demand: Demand) -> None:
219 """
220 Report that a given demand has been satisfied.
222 :param time: the time index when the demand was satisfied
223 :param demand: the demand that was satisfied
224 """
225 if demand.measure:
226 at: Final[float] = demand.arrival
227 pid: Final[int] = demand.product_id
228 if at >= time:
229 self.__immediately_satisfied[pid] += 1
230 else:
231 tt: Final[float] = time - at
232 self.__waiting_times[pid].add(tt)
233 self.__waiting_time.add(tt)
234 self.__fulfilled[pid] += 1
236 def finished(self, time: float) -> None:
237 """
238 Fill the collected statistics into the statistics record.
240 :param time: the time when we are finished
241 """
242 dest: Final[Statistics | None] = self.__dest
243 if dest is None:
244 raise ValueError("Lost destination statistics record?")
245 total: Final[tuple[int, ...]] = self.__n_mdpb
247 for i, stat in enumerate(self.__production_times):
248 dest.production_times[i] = stat.result_or_none()
249 dest.production_time = self.__production_time.result_or_none()
251 sn: int = 0
252 sf: int = 0
253 st: int = 0
254 for i, n in enumerate(self.__immediately_satisfied):
255 t = total[i]
256 dest.immediate_rates[i] = n / t
257 sn += n
258 f = self.__fulfilled[i]
259 dest.fulfilled_rates[i] = f / t
260 sf += f
261 st += t
262 dest.immediate_rate = sn / st
263 dest.fulfilled_rate = sf / st
265 for i, stat in enumerate(self.__waiting_times):
266 dest.waiting_times[i] = stat.result_or_none()
267 dest.waiting_time = self.__waiting_time.result_or_none()
269 slm: Final[StreamSum] = self.__stock_level
270 twl: Final[float] = self.__total - self.__warmup
271 wu: Final[float] = self.__warmup
272 for i, sm in enumerate(self.__stock_levels):
273 wh = self.__in_warehouse[i]
274 v: float = (self.__total - max(wh[1], wu)) * wh[0]
275 sm.add(v)
276 slm.add(v)
277 dest.stock_levels[i] = sm.result() / twl
278 dest.stock_level = slm.result() / twl
279 dest.simulation_time_nanos = time_ns() - self.__start