Coverage for pycommons / math / streams.py: 100%
62 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 03:04 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 03:04 +0000
1"""
2Tools for numerical data aggregation over streams.
4This module provides tools that can be used to aggregate data over streams of
5numerical data.
6Such tools should extend the class
7:class:`~pycommons.math.streams.StreamAggregate`, which provides the tools to
8:meth:`~pycommons.math.streams.StreamAggregate.add` data to the aggregation
9procedure as well as to include whole sequence of data (via
10:meth:`~pycommons.math.streams.StreamAggregate.update`) or to
11:meth:`~pycommons.math.streams.StreamAggregate.reset` the computation.
12It is recommended that subclasses should implement a method `result`
13returning the current result.
15The class :class:`~pycommons.math.streams.StreamSum` is such a subclass of
16:class:`~pycommons.math.streams.StreamAggregate`. It provides a running sum of
17values over a stream of data, using a Kahan summation algorithm of the second
18order. Its method
19:meth:`~pycommons.math.streams.StreamSum.result` returns the current running
20sum value.
21"""
23from math import isfinite
24from typing import Callable, Final, Iterable
26from pycommons.math.int_math import __DBL_INT_LIMIT_N_F as _DILNF
27from pycommons.math.int_math import __DBL_INT_LIMIT_P_F as _DILPF
28from pycommons.math.int_math import try_int_add
29from pycommons.types import type_error
32class StreamAggregate:
33 """
34 The base class for stream aggregates.
36 This class provides a basic API for stream data aggregation.
37 It is implemented by :class:`StreamSum`.
38 """
40 def reset(self) -> None:
41 """
42 Reset this stream aggregate.
44 :raises NotImplementedError: because it is an abstract method
46 >>> ag = StreamAggregate()
47 >>> try:
48 ... ag.reset()
49 ... except NotImplementedError:
50 ... print("not implemented")
51 not implemented
52 """
53 raise NotImplementedError
55 def add(self, value: int | float) -> None:
56 """
57 Add a value to the aggregate.
59 :param value: the value to aggregate
61 :raises NotImplementedError: because it is an abstract method
63 >>> ag = StreamAggregate()
64 >>> try:
65 ... ag.add(1)
66 ... except NotImplementedError:
67 ... print("not implemented")
68 not implemented
69 """
70 raise NotImplementedError
72 def update(self, data: Iterable[int | float | None]) -> None:
73 """
74 Perform a stream update.
76 This function adds all the data to the stream while skipping `None`
77 values.
79 :param data: the data, i.e., a stream of values. `None` values are
80 skipped
82 >>> ag = StreamAggregate()
83 >>> ag.add = lambda d: print(str(d))
84 >>> ag.update((1, 2, 3))
85 1
86 2
87 3
88 >>> ag.update((1, None, 3))
89 1
90 3
91 """
92 ad: Final[Callable[[int | float], None]] = self.add # fast calls!
93 for v in data:
94 if v is not None:
95 ad(v)
98class StreamSum(StreamAggregate):
99 """
100 The second-order Kahan-Babuška-Neumaier-Summation by Klein.
102 [1] A. Klein. A Generalized Kahan-Babuška-Summation-Algorithm.
103 Computing 76:279-293. 2006. doi:10.1007/s00607-005-0139-x
105 >>> stream_sum = StreamSum()
106 >>> stream_sum.update([1e18, 1, 1e36, -1e36, -1e18])
107 >>> stream_sum.result()
108 1
109 >>> stream_sum.reset()
110 >>> stream_sum.update([1e18, 1, 1e36, -1e36, -1e18])
111 >>> stream_sum.result()
112 1
113 >>> stream_sum.reset()
114 >>> stream_sum.update([1e18, 1, 1e36, None, -1e36, -1e18])
115 >>> stream_sum.result()
116 1
117 """
119 def __init__(self) -> None:
120 """Create the summation object."""
121 #: the running integer sum
122 self.__i_sum: int = 0
123 #: the running sum, an internal variable invisible from outside
124 self.__sum: float | int = 0
125 #: the first correction term, another internal variable
126 self.__cs: float | int = 0
127 #: the second correction term, another internal variable
128 self.__ccs: float | int = 0
129 #: did we record a value?
130 self.__has_value: bool = False
132 def reset(self) -> None:
133 """Reset this sum."""
134 self.__sum = 0
135 self.__cs = 0
136 self.__ccs = 0
137 self.__i_sum = 0
138 self.__has_value = False
140 def add(self, value: int | float) -> None:
141 """
142 Add a value to the sum.
144 :param value: the value to add
146 >>> ss = StreamSum()
147 >>> print(ss.result())
148 None
149 >>> ss.add(1)
150 >>> ss.result()
151 1
152 >>> ss.add(2.0)
153 >>> ss.result()
154 3
155 >>> ss.add(1e43)
156 >>> ss.result()
157 1e+43
158 >>> ss.add(-1e43)
159 >>> ss.result()
160 3
161 >>> ss.reset()
162 >>> print(ss.result())
163 None
165 >>> try:
166 ... ss.add("x")
167 ... except TypeError as te:
168 ... print(te)
169 value should be an instance of any in {float, int} but is str, \
170namely 'x'.
172 >>> from math import inf
173 >>> try:
174 ... ss.add(inf)
175 ... except ValueError as ve:
176 ... print(ve)
177 Value must be finite, but is inf.
178 """
179 self.__has_value = True
180 if isinstance(value, int):
181 self.__i_sum += value
182 return
183 if not isinstance(value, float):
184 raise type_error(value, "value", (float, int))
185 if not isfinite(value):
186 raise ValueError(f"Value must be finite, but is {value}.")
187 if _DILNF <= value <= _DILPF:
188 iv: Final[int] = int(value)
189 if value == iv:
190 self.__i_sum += iv
191 return
192 s: int | float = self.__sum # Get the current running sum.
193 t: int | float = s + value # Compute the new sum value.
194 c: int | float = (((s - t) + value) if abs(s) >= abs(value)
195 else ((value - t) + s)) # The Neumaier tweak.
196 self.__sum = t # Store the new sum value.
197 cs: int | float = self.__cs # the current 1st-order correction
198 t = cs + c # Compute the new first-order correction term.
199 cc: int | float = (((cs - t) + c) if abs(cs) >= abs(c)
200 else ((c - t) + cs)) # 2nd Neumaier tweak.
201 self.__cs = t # Store the updated first-order correction term.
202 self.__ccs += cc # Update the second-order correction.
204 def add_sum(self, ss: "StreamSum") -> None:
205 """
206 Add another stream sum to this one.
208 :param ss: the other stream sum
210 >>> ss1 = StreamSum()
211 >>> ss1.update((1, 1e20))
212 >>> ss2 = StreamSum()
213 >>> ss2.update((5, -1e20))
214 >>> ss1.add_sum(ss2)
215 >>> ss1.result()
216 6
218 >>> try:
219 ... ss1.add_sum("x")
220 ... except TypeError as te:
221 ... print(str(te)[:31])
222 other sum should be an instance
223 """
224 if not isinstance(ss, StreamSum):
225 raise type_error(ss, "other sum", StreamSum)
226 if ss.__has_value:
227 self.add(ss.__i_sum)
228 self.add(ss.__sum)
229 self.add(ss.__cs)
230 self.add(ss.__ccs)
232 def result(self) -> int | float | None:
233 """
234 Get the current result of the summation.
236 :return: the current result of the summation, or `None` if no value
237 was added yet
238 """
239 return try_int_add(
240 self.__i_sum, self.__sum + self.__cs + self.__ccs) \
241 if self.__has_value else None