Coverage for moptipyapps / prodsched / statistics.py: 93%
137 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-13 08:40 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-13 08:40 +0000
1"""
2A statistics record for the simulation.
4This module provides a record with statistics derived from one single
5MFC simulation. It can store values such as the mean fill rate or the
6mean stock level.
7Such statistics records are filled in by instances of the
8:class:`~moptipyapps.prodsched.statistics_collector.StatisticsCollector`
9plugged into the
10:class:`~moptipyapps.prodsched.simulation.Simulation`.
11"""
13from itertools import chain
14from typing import Callable, Final, Generator, Iterable, Self
16from moptipy.utils.logger import KEY_VALUE_SEPARATOR
17from pycommons.io.csv import CSV_SEPARATOR, SCOPE_SEPARATOR
18from pycommons.math.stream_statistics import (
19 KEY_MAXIMUM,
20 KEY_MEAN_ARITH,
21 KEY_MINIMUM,
22 KEY_STDDEV,
23 StreamStatistics,
24)
25from pycommons.strings.string_conv import (
26 num_or_none_to_str,
27 str_to_num_or_none,
28)
29from pycommons.types import check_int_range, type_error
31#: the name of the statistics key
32COL_STAT: Final[str] = "stat"
33#: the total column name
34COL_TOTAL: Final[str] = "total"
35#: the statistics rate
36KEY_RATE: Final[str] = "rate"
37#: the product column prefix
38COL_PRODUCT_PREFIX: Final[str] = "product_"
39#: the mean TRP row
40ROW_TRP: Final[str] = "trp"
41#: the fill rate row
42ROW_FILL_RATE: Final[str] = f"fill{SCOPE_SEPARATOR}{KEY_RATE}"
43#: the CWT row
44ROW_CWT: Final[str] = "cwt"
45#: the mean stock level row
46ROW_STOCK_LEVEL_MEAN: Final[str] = \
47 f"stocklevel{SCOPE_SEPARATOR}{KEY_MEAN_ARITH}"
48#: the fulfilled rate
49ROW_FULFILLED_RATE: Final[str] = f"fulfilled{SCOPE_SEPARATOR}{KEY_RATE}"
50#: the simulation time getter
51ROW_SIMULATION_TIME: Final[str] = \
52 f"time{SCOPE_SEPARATOR}s{KEY_VALUE_SEPARATOR}"
54#: the statistics that we will print
55_STATS: tuple[tuple[str, Callable[[
56 StreamStatistics], int | float | None]], ...] = (
57 (KEY_MINIMUM, StreamStatistics.getter_or_none(KEY_MINIMUM)),
58 (KEY_MEAN_ARITH, StreamStatistics.getter_or_none(KEY_MEAN_ARITH)),
59 (KEY_MAXIMUM, StreamStatistics.getter_or_none(KEY_MAXIMUM)),
60 (KEY_STDDEV, StreamStatistics.getter_or_none(KEY_STDDEV)))
63class Statistics:
64 """
65 A statistics record based on production scheduling.
67 It provides the following statistics:
69 - :attr:`~immediate_rates`: The per-product-fillrate, i.e., the fraction
70 of demands of a given product that were immediately fulfilled when
71 arriving in the system (i.e., that were fulfilled by using product that
72 was available in the warehouse/in stock).
73 Higher values are good.
74 - :attr:`~immediate_rate`: The overall fillrate, i.e., the total fraction
75 of demands that were immediately fulfilled upon arrival in the system
76 over all demands. That is, this is the fraction of demands that were
77 fulfilled by using product that was available in the warehouse/in stock.
78 Higher values are good.
79 - :attr:`~waiting_times`: The per-product waiting times ("CWT") for the
80 demands that came in but could *not* immediately be fulfilled. These are
81 the demands for a given product that were, so to say, not covered by the
82 fillrate/:attr:`~immediate_rate`. If all demands of a product could
83 immediately be satisfied, then this is `None`.
84 Otherwise, smaller values are good.
85 - :attr:`~waiting_time`: The overall waiting times ("CWT") for the demands
86 that came in but could *not* immediately be fulfilled. These are all the
87 demands for a given product that were, so to say, not covered by the
88 fillrate/:attr:`~immediate_rate`. If all demands could immediately be
89 satisfied, then this is `None`.
90 Otherwise, smaller values are good.
91 - :attr:`~production_times`: The per-product times that producing one unit
92 of the product takes from the moment that a production job is created
93 until it is completed. Smaller values of this "TRP" are better.
94 - :attr:`~production_time`: The overall statistics on the times that
95 producing one unit of any product takes from the moment that a
96 production job is created until it is completed. Smaller values this
97 "TRP" are better.
98 - :attr:`~fulfilled_rates`: The per-product fraction of demands that were
99 satisfied. Demands for a product may remain unsatisfied if they have not
100 been satisfied by the end of the simulation period. Larger values are
101 better.
102 - :attr:`~fulfilled_rate`: The fraction of demands that were satisfied.
103 Demands may remain unsatisfied if they have not been satisfied by the
104 end of the simulation period. Larger values are better.
105 - :attr:`~stock_levels`: The average amount of a given product in the
106 warehouse averaged over the simulation time. Smaller values are better.
107 - :attr:`~stock_level`: The total average amount units of any product in
108 the warehouse averaged over the simulation time. Smaller values are
109 better.
110 - :attr:`~simulation_time_nanos`: The total time that the simulation took,
111 measured in nanoseconds.
113 Instances of this class are filled by
114 :class:`~moptipyapps.prodsched.statistics_collector.StatisticsCollector`
115 objects plugged into the
116 :class:`~moptipyapps.prodsched.simulation.Simulation`.
117 """
119 def __init__(self, n_products: int) -> None:
120 """
121 Create the statistics record for a given number of products.
123 :param n_products: the number of products
124 """
125 check_int_range(n_products, "n_products", 1, 1_000_000_000)
126 #: the production time (TRP) statistics per-product
127 self.production_times: Final[list[
128 StreamStatistics | None]] = [None] * n_products
129 #: the overall production time (TRP) statistics
130 self.production_time: StreamStatistics | None = None
131 #: the fraction of demands that were immediately satisfied,
132 #: on a per-product basis, i.e., the fillrate
133 self.immediate_rates: Final[list[int | float | None]] = (
134 [None] * n_products)
135 #: the overall fraction of immediately satisfied demands, i.e.,
136 #: the fillrate
137 self.immediate_rate: int | float | None = None
138 #: the average waiting time for all demands that were not immediately
139 #: satisfied -- only counting demands that were actually satisfied,
140 #: i.e., the CWT
141 self.waiting_times: Final[list[
142 StreamStatistics | None]] = [None] * n_products
143 #: the overall waiting time for all demands that were not immediately
144 #: satisfied -- only counting demands that were actually satisfied,
145 #: i.e., the CWT
146 self.waiting_time: StreamStatistics | None = None
147 #: the fraction of demands that were fulfilled, on a per-product basis
148 self.fulfilled_rates: Final[list[
149 int | float | None]] = [None] * n_products
150 #: the fraction of demands that were fulfilled overall
151 self.fulfilled_rate: int | float | None = None
152 #: the average stock level, on a per-product basis
153 self.stock_levels: Final[list[
154 int | float | None]] = [None] * n_products
155 #: the overall average stock level
156 self.stock_level: int | float | None = None
157 #: the nanoseconds used by the simulation
158 self.simulation_time_nanos: int | float | None = None
160 def __str__(self) -> str:
161 """Convert this object to a string."""
162 return "\n".join(to_stream(self))
164 def clear(self) -> None:
165 """Clear all the data."""
166 n: Final[int] = list.__len__(self.production_times)
167 if n <= 0:
168 raise ValueError("Huh?")
169 for i in range(n):
170 self.production_times[i] = None
171 self.immediate_rates[i] = None
172 self.waiting_times[i] = None
173 self.fulfilled_rates[i] = None
174 self.stock_levels[i] = None
176 self.production_time = None
177 self.immediate_rate = None
178 self.waiting_time = None
179 self.fulfilled_rate = None
180 self.stock_level = None
181 self.simulation_time_nanos = None
183 def copy_from(self, stat: "Statistics") -> None:
184 """
185 Copy the contents of another statistics record.
187 :param stat: the other statistics record
188 """
189 if not isinstance(stat, Statistics):
190 raise type_error(stat, "stat", Statistics)
191 self.production_times[:] = stat.production_times
192 self.production_time = stat.production_time
193 self.immediate_rates[:] = stat.immediate_rates
194 self.immediate_rate = stat.immediate_rate
195 self.waiting_times[:] = stat.waiting_times
196 self.waiting_time = stat.waiting_time
197 self.fulfilled_rates[:] = stat.fulfilled_rates
198 self.fulfilled_rate = stat.fulfilled_rate
199 self.stock_levels[:] = stat.stock_levels
200 self.stock_level = stat.stock_level
201 self.simulation_time_nanos = stat.simulation_time_nanos
203 def from_stream(self, stream: Iterable[str]) -> Self:
204 """
205 Load the data from a stream.
207 Notice: The `n` values of the statistics records cannot be loaded.
208 They will be lost and just set to some more or less random number.
210 :param stream: the stream of data
211 :return: this object
212 """
213 self.clear()
215 n: Final[int] = list.__len__(self.production_times)
216 if n <= 0:
217 raise ValueError("Huh?")
219 keys: Final[set[str]] = {
220 f"{key}{SCOPE_SEPARATOR}{the_stat[0]}"
221 for key in (ROW_TRP, ROW_CWT) for the_stat in _STATS}
222 keys.update((ROW_FILL_RATE, ROW_FULFILLED_RATE,
223 ROW_STOCK_LEVEL_MEAN))
224 sim_time_key: Final[str] = ROW_SIMULATION_TIME
226 data: dict[str, list[int | float | None]] = {}
227 sim_time: int | None = None
228 for srow in stream:
229 row = str.strip(srow)
230 if row.startswith(sim_time_key):
231 sim_time = check_int_range(
232 round(float(row[str.__len__(
233 sim_time_key):]) * 1_000_000_000),
234 sim_time_key, 0, 1_000_000_000_000_000_000_000_000)
235 if set.__len__(keys) <= 0:
236 break
237 continue
239 cols: list[str] = str.split(srow, CSV_SEPARATOR)
240 key: str = cols[0]
241 if (list.__len__(cols) <= (n + 1)) or (key not in keys):
242 continue
243 if key in data:
244 raise ValueError(f"Duplicate key '{key}'.")
245 data[key] = [str_to_num_or_none(cols[i]) for i in range(1, n + 2)]
246 keys.remove(key)
247 if (set.__len__(keys) <= 0) and (sim_time is not None):
248 break
250 if set.__len__(keys) > 0:
251 raise ValueError(f"Missing keys: {keys}")
252 if sim_time is None:
253 raise ValueError(f"Did not find key '{sim_time_key}'.")
255 self.simulation_time_nanos = sim_time
256 self.production_time = _split_data_stat(
257 data, ROW_TRP, self.production_times)
258 self.waiting_time = _split_data_stat(
259 data, ROW_CWT, self.waiting_times)
261 vals: list[int | float | None] = data[ROW_FILL_RATE]
262 self.immediate_rate = vals[0]
263 self.immediate_rates[:] = vals[1:]
265 vals = data[ROW_STOCK_LEVEL_MEAN]
266 self.stock_level = vals[0]
267 self.stock_levels[:] = vals[1:]
269 vals = data[ROW_FULFILLED_RATE]
270 self.fulfilled_rate = vals[0]
271 self.fulfilled_rates[:] = vals[1:]
273 return self
276def _split_data_stat(data: dict[str, list[int | float | None]],
277 key: str,
278 dest: list[StreamStatistics | None]) \
279 -> StreamStatistics | None:
280 """
281 Split a data set.
283 :param data: the data set
284 :param dest: the destination list
285 :return: the main statistics, if any
286 """
287 key_min: Final[str] = f"{key}{SCOPE_SEPARATOR}{KEY_MINIMUM}"
288 key_mean: Final[str] = f"{key}{SCOPE_SEPARATOR}{KEY_MEAN_ARITH}"
289 key_max: Final[str] = f"{key}{SCOPE_SEPARATOR}{KEY_MAXIMUM}"
290 key_sd: Final[str] = f"{key}{SCOPE_SEPARATOR}{KEY_STDDEV}"
291 for i in range(1, list.__len__(dest) + 1):
292 dest[i - 1] = __stream_stats(data, key_min, key_mean, key_max,
293 key_sd, i)
294 return __stream_stats(data, key_min, key_mean, key_max, key_sd, 0)
297def __stream_stats(data: dict[str, list[int | float | None]],
298 key_min: str, key_mean: str, key_max: str, key_sd: str,
299 i: int) -> StreamStatistics | None:
300 """
301 Get a stream statistics.
303 :param data: the data array
304 :param key_min: the minimum key
305 :param key_mean: the mean key
306 :param key_max: the maximum key
307 :param key_sd: the standard deviation key
308 :param i: the index
309 :return: the statistics or `None`
310 """
311 the_min = data[key_min][i]
312 the_mean = data[key_mean][i]
313 the_max = data[key_max][i]
314 the_sd = data[key_sd][i]
315 if (the_min is None) or (the_max is None):
316 return None
317 if the_mean is None:
318 raise ValueError(
319 f"Invalid mean {the_mean} for min={the_min}, max={the_max}!")
320 return StreamStatistics(n=1 if the_sd is None else 100, minimum=the_min,
321 mean_arith=the_mean,
322 maximum=the_max, stddev=the_sd)
325def to_stream(stats: Statistics) -> Generator[str, None, None]:
326 """
327 Write a statistics record to a stream.
329 :param stats: the statistics record
330 :return: the stream of data
331 """
332 n_products: Final[int] = list.__len__(stats.production_times)
333 nts: Final[Callable[[int | float | None], str]] = num_or_none_to_str
335 yield str.join(CSV_SEPARATOR, chain((
336 COL_STAT, COL_TOTAL), (f"{COL_PRODUCT_PREFIX}{i}" for i in range(
337 n_products))))
339 for key, alle, single in (
340 (ROW_TRP, stats.production_times, stats.production_time),
341 (ROW_CWT, stats.waiting_times, stats.waiting_time)):
342 for stat, call in _STATS:
343 yield str.join(CSV_SEPARATOR, chain((
344 f"{key}{SCOPE_SEPARATOR}{stat}", nts(call(single))), (
345 map(nts, map(call, alle)))))
346 yield str.join(CSV_SEPARATOR, chain((
347 ROW_FILL_RATE, nts(stats.immediate_rate)), (
348 map(nts, stats.immediate_rates))))
349 yield str.join(CSV_SEPARATOR, chain((
350 ROW_STOCK_LEVEL_MEAN, nts(stats.stock_level)), (
351 map(nts, stats.stock_levels))))
352 yield str.join(CSV_SEPARATOR, chain((
353 ROW_FULFILLED_RATE, nts(stats.fulfilled_rate)), (
354 map(nts, stats.fulfilled_rates))))
355 yield f"{ROW_SIMULATION_TIME}{nts(
356 stats.simulation_time_nanos / 1_000_000_000)}"